使用Python进行并发编程-asyncio篇(三)

这是「使用Python进行并发编程」系列的最后一篇。我特意地把它安排在了16年最后一天,先祝各位元旦快乐。

重新实验上篇的效率对比的实现

在第一篇我们曾经对比并发执行的效率,但是请求的是httpbin.org这个网站。很容易受到网络状态和其服务质量的影响。所以我考虑启用一个本地的eb服务。那接下来选方案吧。
我用sanic提供的不同方案的例子,对tornado、aiohttp+ujson+uvloop、sanic+uvloop三种方案,在最新的Python
3.6下,使用wrk进行了性能测试。
先解释下上面提到的几个关键词:

  1. aiohttp。一个实现了PEP3156的HTTP的服务器,且包含客户端相关功能。最早出现,应该最知名。
  2. sanic。后起之秀,基于Flask语法的异步Web框架。
  3. uvloop。用Cython编写的、用来替代asyncio事件循环。作者说「它在速度上至少比Node.js、gevent以及其它任何Python异步框架快2倍」。
  4. ujson。比标准库json及其社区版的simplejson都要快的JSON编解码库。
    使用的测试命令是:
1
2

wrk -d20s -t10 -c200 http://127.0.0.1:8000

表示使用10个线程、并发200、持续20秒。
在我个人Mac上获得的结果是:
方案 | tornado | aiohttp + ujson + uvloop | sanic + uvloop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
平均延时 | 122.58ms | 35.49ms | 11.03ms  
请求数/秒 | 162.94 | 566.87 | 2.02k

所以简单的返回json数据,看起来sanic + uvloop是最快的。首先我对市面的各种Benchmark的对比是非常反感的,不能用hello
world这种级别的例子的结果就片面的认为某种方案效率是最好的, **一定**
要根据你实际的生产环境,再不行影响线上服务的前提下,对一部分有代表性的接口进程流量镜像之类的方式去进行效率的对比。而我认可上述的结果是因为正好满足我接下来测试用到的功能而已。
写一个能GET某参数返回这个参数的sanic+uvloop的版本的例子:

``` python

from sanic import Sanic
from sanic.response import json

app = Sanic(__name__)


@app.route('/get')
async def test(request):
a = request.args.get('a')
return json({'args': {'a': a}})


if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000)

然后把之前的效率对比的代码改造一下,需要变化如下几步:

  1. 替换请求地址,也就是把httpbin.org改成了localhost:8000
  2. 增加要爬取的页面数量,由于sanic太快了(无奈脸),12个页面秒完,所以改成NUMBERS = range(240)
  3. 由于页面数量大幅增加,不能在终端都打印出来。而且之前已经验证过正确性。去掉那些print
    看下效果:
1
2
3
4
5
6

❯ python3 scraper_thread.py
Use requests+ThreadPoolExecutor cost: 0.9809930324554443
Use asyncio+requests+ThreadPoolExecutor cost: 0.9977471828460693
Use asyncio+aiohttp cost: 0.25928187370300293
Use asyncio+aiohttp+ThreadPoolExecutor cost: 0.278397798538208

可以感受到asyncio+aiohttp依然是最快的。随便挺一下Sanic,准备有机会在实际工作中用一下。

asyncio在背后怎么运行的呢?

Asynchronous
Python
这篇文章里面我找到一个表达的不错的asyncio运行的序列图。例子我改编如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

import asyncio

async def compute(x, y):
print('Compute {} + {} ...'.format(x, y))
await asyncio.sleep(1.0)
return x + y


async def print_sum(x, y):
result = await compute(x, y)
print('{} + {} = {}'.format(x, y, result))


loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

运行的过程是这样的:

如何把同步的代码改成异步的

之前有位订阅我的公众号的同学问过这个问题,我想了一个例子来让事情变的清楚。
首先看一个同步的例子:

1
2
3
4
5
6
7

def handle(id):
subject = get_subject_from_db(id)
buyinfo = get_buyinfo(id)
change = process(subject, buyinfo)
notify_change(change)
flush_cache(id)

可以看到,需要获取subject和buyinfo之后才能执行process,然后才能执行notify_change和flush_cache。
如果使用asyncio,就是这样写:

1
2
3
4
5
6
7
8
9
10
11

import asyncio


async def handle(id):
subject = asyncio.ensure_future(get_subject_from_db(id))
buyinfo = asyncio.ensure_future(get_buyinfo(id))
results = await asyncio.gather(subject, buyinfo)
change = await process(results)
await notify_change(change)
loop.call_soon(flush_cache, id)

原则上无非是让能一起协同的函数异步化(subject和buyinfo已经是Future对象了),然后通过gather获取到这些函数执行的结果;有顺序的就用call_soon来保证。
继续深入,现在详细了解下一步还有什么其他解决方案以及其应用场景:

  1. 包装成Future对象。上面使用了ensure_future来做,上篇也说过,也可以用loop.create_task。如果你看的是老文章可能会出现asyncio.async这种用法,它现在已经被弃用了。如果你已经非常熟悉,你也可以直接使用asyncio.Task(get_subject_from_db(id))这样的方式。
  2. 回调。上面用到了call_soon这种回调。除此之外还有如下两种:
    1. loop.call_later(delay, func, *args)。延迟delay秒之后再执行。
    2. loop.call_at(when, func, *args)。 某个时刻才执行。
      其实套路就是这些罢了。

      爬虫分析

      可能你已经听过开源程序架构系列书了。今天我们将介绍第四本500 Lines or
      Less
      中的爬虫项目。顺便说一下,这个项目里面每章都是由不同领域非常知名的专家而写,代码不超过500行。目前包含web服务器、决策采样器、Python解释器、爬虫、模板引擎、OCR持续集成系统、分布式系统、静态检查等内容。值得大家好好学习下。
      我们看的这个例子,是实现一个高性能网络爬虫,它能够抓取你指定的网站的全部地址。它是由MongoDB的C和Python驱动的主要开发者ajdavis以及Python之父Guido
      van Rossum一起完成的。BTW, 我是ajdavis粉儿!
      如果你想看了解这篇爬虫教程可以访问: [A Web Crawler With asyncio
      Coroutines](http://aosabook.org/en/500L/a-web-crawler-with-asyncio-
      coroutines.html),这篇和教程关系不大,是一篇分析文章。
      我们首先下载并安装对应的依赖:
1
2
3
4

❯ git clone https://github.com/aosabook/500lines
❯ cd 500lines
❯ python3 -m pip install -r requirements.txt

运行一下,看看效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

❯ python3 crawler/code/crawl.py -q python-cn.org --exclude github
...
http://python-cn.org:80/user/zuoshou/topics 200 text/html utf-8 13212 0/22
http://python-cn.org:80/users 200 text/html utf-8 34156 24/41
http://python-cn.org:80/users/online 200 text/html utf-8 11614 0/17
http://python-cn.org:80/users/sort-posts 200 text/html utf-8 34642 0/41
http://python-cn.org:80/users/sort-reputation 200 text/html utf-8 34721 15/41
Finished 2365 urls in 47.868 secs (max_tasks=100) (0.494 urls/sec/task)
4 error
36 error_bytes
2068 html
42735445 html_bytes
98 other
937394 other_bytes
195 redirect
4 status_404
Todo: 0
Done: 2365
Date: Fri Dec 30 22:03:50 2016 local time

可以看到 http://python-cn.org 有2365个页面,花费了47.868秒,并发为100。
这个项目有如下一些文件:

1
2
3
4
5
6
7
8
9
10

❯ tree crawler/code -L 1
crawler/code
├── Makefile
├── crawl.py
├── crawling.py
├── reporting.py
├── requirements.txt
├── supplemental
└── test.py

其中主要有如下三个程序:

  1. crawl.py是主程序,其中包含了参数解析,以及事件循环。
  2. crawling.py抓取程序,crawl.py中的异步函数就是其中的Crawler类的crawl方法。
  3. reporting.py顾名思义,生成抓取结果的程序。
    本文主要看crawling.py部分。虽然它已经很小(加上空行才275行),但是为了让爬虫的核心更直观,我把其中的兼容性、日志功能以及异常的处理去掉,并将处理成Python
    3.5新的async/await语法。
    首先列一下这个爬虫实现什么功能:
  4. 输入一个根链接,让爬虫自动帮助我们爬完所有能找到的链接
  5. 把全部的抓取结果存到一个列表中
  6. 可以排除包含某些关键词链接的抓取
  7. 可以控制并发数
  8. 可以抓取自动重定向的页面,且可以限制重定向的次数
  9. 抓取失败可重试
    目前对一个复杂的结果结构常定义一个namedtuple,首先把抓取的结果定义成一个FetchStatistic:
1
2
3
4
5
6
7
8
9
10
11

FetchStatistic = namedtuple('FetchStatistic',
['url',
'next_url',
'status',
'exception',
'size',
'content_type',
'encoding',
'num_urls',
'num_new_urls'])

其中包含了url,文件类型,状态码等用得到的信息。
然后实现抓取类Crawler,首先是初始化方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

class Crawler:
def __init__(self, roots,
exclude=None, strict=True, # What to crawl.
max_redirect=10, max_tries=4, # Per-url limits.
max_tasks=10, *, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.roots = roots
self.exclude = exclude
self.strict = strict
self.max_redirect = max_redirect
self.max_tries = max_tries
self.max_tasks = max_tasks
self.q = Queue(loop=self.loop)
self.seen_urls = set()
self.done = []
self.session = aiohttp.ClientSession(loop=self.loop)
self.root_domains = set()
for root in roots:
parts = urllib.parse.urlparse(root)
host, port = urllib.parse.splitport(parts.netloc)
if not host:
continue
if re.match(r'\A[\d\.]*\Z', host):
self.root_domains.add(host)
else:
host = host.lower()
if self.strict:
self.root_domains.add(host)
else:
self.root_domains.add(lenient_host(host))
for root in roots:
self.add_url(root)
self.t0 = time.time()
self.t1 = None

信息量比较大,我拿出重要的解释下:

  1. 第7行,self.roots就是待抓取的网站地址,是一个列表。
  2. 第13行,self.q这个队列就存储了待抓取的url
  3. 第14行,self.seen_urls会保证不重复与抓取已经抓取过的url
  4. 第16行,使用requests或者aiphttp,都是推荐使用一个会话完成全部工作,要不然有些需要登陆之后的操作就做不了了。
  5. 第18-30行,这个for循环会解析self.roots中的域名,这是为了只抓取指定的网站,其它网站的链接会基于这个集合过滤掉
  6. 第31-32行,触发抓取,把url放入self.q的队列,就可以被worker执行了
  7. 第33-34行,t0和t1是为了记录抓取的时间戳,最后可以计算抓取的总耗时
    接着我们看add_url的实现:
1
2
3
4
5
6

def add_url(self, url, max_redirect=None):
if max_redirect is None:
max_redirect = self.max_redirect
self.seen_urls.add(url)
self.q.put_nowait((url, max_redirect))

其中q.put_nowait相当于非阻塞的q.put,还可以看到这个url被放入了self.seen_urls
现在我们从事件循环会用到的crawl方法开始往回溯:

1
2
3
4
5
6
7
8
9

async def crawl(self):
workers = [asyncio.Task(self.work(), loop=self.loop)
for _ in range(self.max_tasks)]
self.t0 = time.time()
await self.q.join()
self.t1 = time.time()
for w in workers:
w.cancel()

类中的方法可以直接用async关键词的。worker就是self.work,这些worker会在后台运行,但是会阻塞在join上,直到初始化时候放入self.q的url都完成。最后需要让worker都取消掉。
然后看self.work:

1
2
3
4
5
6
7
8
9
10

async def work(self):
try:
while True:
url, max_redirect = await self.q.get()
assert url in self.seen_urls
await self.fetch(url, max_redirect)
self.q.task_done()
except asyncio.CancelledError:
pass

当执行worker.cancel方法就会引起asyncio.CancelledError,然后while
1的循环就结束了。执行完fetch方法,需要标记get的这个url执行完成,也就是要执行self.q.task_done,要不然最后join是永远结束不了的。
接下来就是self.fetch,这个方法比较长:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

async def fetch(self, url, max_redirect):
tries = 0
exception = None
while tries < self.max_tries:
try:
response = await self.session.get(
url, allow_redirects=False)
break
except aiohttp.ClientError as client_error:
exception = client_error

tries += 1
else:
self.record_statistic(FetchStatistic(url=url,
next_url=None,
status=None,
exception=exception,
size=0,
content_type=None,
encoding=None,
num_urls=0,
num_new_urls=0))
return

try:
if is_redirect(response):
location = response.headers['location']
next_url = urllib.parse.urljoin(url, location)
self.record_statistic(FetchStatistic(url=url,
next_url=next_url,
status=response.status,
exception=None,
size=0,
content_type=None,
encoding=None,
num_urls=0,
num_new_urls=0))

if next_url in self.seen_urls:
return
if max_redirect > 0:
self.add_url(next_url, max_redirect - 1)
else:
print('redirect limit reached for %r from %r',
next_url, url)
else:
stat, links = await self.parse_links(response)
self.record_statistic(stat)
for link in links.difference(self.seen_urls):
self.q.put_nowait((link, self.max_redirect))
self.seen_urls.update(links)
finally:
await response.release()

简单的说,fetch就是去请求url,获得响应。然后把结果组织成一个FetchStatistic,通过self.record_statistic放进self.done这个列表,然后对结果进行解析,通过self.parse_links(response)或者这个页面的结果包含的其他链接,和现在已经抓取的链接集合对比,把还没有抓的放入self.q。
如果这个url被重定向,就把重定向的链接放进self.q,待worker拿走执行。
然后我们看parse_links的实现,也比较长:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

async def parse_links(self, response):
links = set()
content_type = None
encoding = None
body = await response.read()

if response.status == 200:
content_type = response.headers.get('content-type')
pdict = {}

if content_type:
content_type, pdict = cgi.parse_header(content_type)

encoding = pdict.get('charset', 'utf-8')
if content_type in ('text/html', 'application/xml'):
text = await response.text()

urls = set(re.findall(r'''(?i)href=["']([^\s"'<>]+)''',
text))
for url in urls:
normalized = urllib.parse.urljoin(response.url, url)
defragmented, frag = urllib.parse.urldefrag(normalized)
if self.url_allowed(defragmented):
links.add(defragmented)

stat = FetchStatistic(
url=response.url,
next_url=None,
status=response.status,
exception=None,
size=len(body),
content_type=content_type,
encoding=encoding,
num_urls=len(links),
num_new_urls=len(links - self.seen_urls))

return stat, links
`

其实就是用re.findall(r'''(?i)href=["']([^\s"'<>]+)''', text)找到链接,然后进行必要的过滤,就拿到全部链接了。
这就是一个爬虫,是不是很简单。但是写的算是「最佳实践」。最后,我们看一下怎么调用Crawler:

1
2
3
4
5
6
7
8
9

loop = asyncio.get_event_loop()
crawler = Crawler(['http://python-cn.org'], max_tasks=100)
loop.run_until_complete(crawler.crawl())
print('Finished {0} urls in {1:.3f} secs'.format(len(crawler.done),
crawler.t1 - crawler.t0))
crawler.close()

loop.close()

希望对大家的爬虫技艺有提高!
最后祝大家元旦快乐
PS:本文全部代码可以在微信公众号文章代码库项目中找到。

版权声明:本文由 董伟明 原创,未经作者授权禁止任何微信公众号和向掘金(juejin.im)转载,技术博客转载采用 保留署名-非商业性使用-禁止演绎 4.0-国际许可协议
python