我们今天继续深入学习asyncio。
同步机制
asyncio模块包含多种同步机制,每个原语的解释可以看线程篇,这些原语的用法上和线程/进程有一些区别。
Semaphore(信号量)
并发的去爬取显然可以让爬虫工作显得更有效率,但是我们应该把抓取做的无害,这样既可以保证我们不容易发现,也不会对被爬的网站造成一些额外的压力。
在这里吐槽下,豆瓣现在几乎成了爬虫练手专用网站,我个人也不知道为啥?欢迎留言告诉我。难道是豆瓣一直秉承尊重用户的原则不轻易对用户才去封禁策略,造成大家觉得豆瓣最适合入门么?BTW,我每天在后台都能看到几十万次无效的抓取,也就是抓取程序写的有问题,但还在不停地请求着…
好吧回到正题,比如我现在要抓取http://httpbin.org/get?a=X这样的页面,X为1-10000的数字,一次性的产生1w次请求显然很快就会被封掉。那么我们可以用Semaphore控制同时的并发量(例子中为了演示,X为0-11):
1 |
|
在运行的时候可以感受到并发受到了信号量的限制,基本保持在同时处理三个请求的标准。
Lock(锁)
看下面的例子:
1 |
|
这个例子中我们首先使用acquire加锁,通过call_later方法添加一个0.1秒后释放锁的函数。看一下调用:
1 |
|
Condition(条件)
我们根据线程篇Condition的例子,改成一下:
1 |
|
这次演示了2种通知的方式:
- 使用notify方法挨个通知单个消费者
- 使用notify_all方法一次性的通知全部消费者
由于producer和producer2是异步的函数,所以不能使用之前call_later方法,需要用create_task把它创建成一个任务(Task)。但是最后记得要把任务取消掉。
执行以下看看效果:
1 |
|
Event(事件)
模仿锁的例子实现:
1 |
|
看起来也确实和锁的意思很像,不同的是,事件被触发时,2个消费者不用获取锁就要尽快的执行下去了。
Queue
在asyncio官网上已经举例了2个很好的队列例子了,这文就不重复了。asyncio同样支持LifoQueue和PriorityQueue,我们体验下aiohttp+优先级队列的用法吧:
1 |
|
看到使用了新的ensure_future方法,其实它和之前说的create_task意思差不多,都是为了把一个异步的函数变成一个协程的Task。它们的区别是:
- create_task是AbstractEventLoop的抽象方法,不同的loop可以实现不同的创建Task方法,这里用的是BaseEventLoop的实现。
- ensure_future是asyncio封装好的创建Task的函数,它还支持一些参数,甚至指定loop。一般应该使用它,除非用到后面提到的uvloop这个第三方库。
这个例子中,首先我们从0-99中随机取出7个数字,放入优先级队列,看看消费者是不是按照从小到大的顺序执行的呢?
1 |
|
确实是这样的。
说到这里,我们稍微偏个题,看看Task是什么?
深入Task
Task类用来管理协同程序运行的状态。根据源码,我保留核心,实现一个简单的Task类帮助大家理解:
1 |
|
如果_step方法没有让协程执行完成,就会添加回调,_wakeup又会继续执行_step… 直到协程程序完成,并set_result。
写个使用它的例子:
1 |
|
第一个任务是用我们自己的Task创建的,第二个是用BaseEventLoop自带的create_task。
运行一下:
1 |
|
自定义的Task类和asyncio自带的是可以好好协作的。
深入事件循环
asyncio根据你的操作系统信息会帮你选择默认的事件循环类,在*nix下使用的类继承于BaseEventLoop,在上面已经提到了。和Task一样,我们剥离出一份最核心的实现:
1 |
|
其中call_exception_handler和get_debug是必须存在的。
写个例子用一下:
1 |
|
执行:
1 |
|
也可以和asyncio.wait正常协作了。
PS:本文全部代码可以在微信公众号文章代码库项目中找到。
版权声明:本文由 董伟明 原创,未经作者授权禁止任何微信公众号和向掘金(juejin.im)转载,技术博客转载采用 保留署名-非商业性使用-禁止演绎 4.0-国际许可协议
python