知乎Live全文搜索之完成爬虫

看这篇文章前推荐阅读相关的如下文章:

  1. 使用API登录知乎并获得token
  2. 知乎Live全文搜索之模型设计和爬虫实现
  3. 知乎Live全文搜索之模型接口
    在[知乎Live全文搜索之模型设计和爬虫实现」里面我已经说过这是本年度最后一次说爬虫,今天就啪啪的打脸了。主要现在的爬虫有比较大的改变,由于微信公众号文章长度限制一篇放不上,只能专门写一篇啦。

    抓取话题信息

    给新增的Topic提供数据。在parse_live_link中,解析到Live数据中包含了topic的id,
    基于这个id拼链接,然后在fetch方法中添加对topic页面的处理,新增parse_topic_link方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

TOPIC_API_URL = 'https://api.zhihu.com/topics/{}'


class Crawler:
def __init__(self, max_redirect=10, max_tries=4,
max_tasks=10, *, loop=None):
...
self.seen_topics = set()

async def parse_topic_link(self, response):
rs = await response.json()
if response.status == 200:
rs['avatar_url'] = await self.convert_local_image(
rs['avatar_url'].replace('_s', '_r'))
Topic.add_or_update(**flatten_live_dict(rs, TOPIC_KEYS))

async def parse_live_link(self, response):
...

topics = live_dict.pop('topics')
for topic in topics:
topic_id = topic['id']
if topic_id not in self.seen_topics:
self.seen_topics.add(topic_id)
self.add_url(TOPIC_API_URL.format(topic_id),
self.max_redirect)
...

async def fetch(self, url, max_redirect):
try:
if 'api.zhihu.com' in url:
parse_func = (self.parse_topic_link if 'topics' in url
else self.parse_live_link)
next_url = await parse_func(response)
else:
next_url = await self.parse_zhuanlan_link(response)
...

思考下,这是不是一种套路(模式):

  1. 初始化一个已完成的url的集合
  2. 启动一定量的worker,每个worker都在等待从队列获取要抓取的url
  3. 一次性添加足量要抓取的链接到队列中,让每个worker都忙起来(执行前要确认之前没有抓取过)
  4. worker在parse处理中又会解析到新的要抓取的链接,放入队列
  5. worker消费任务,过程中也可能生产任务给自己或者其他worker来消费
  6. 全部任务都完成了,通过self.q.join()结束
  7. 停止这些worker,任务完成

    修改live灌suggest数据的方式

    在上上篇我把相关字段的文本用analyze接口拆分成不同的token成为一个列表赋值给live_suggest,其实完全不需要这样,因为Completion(analyzer=ik_analyzer)就是做这个的。gen_suggests用最简单的input+weight就可以:
1
2
3
4
5
6

def gen_suggests(topics, tags, outline, username, subject):
suggests = [{'input': item, 'weight': weight}
for item, weight in ((topics, 10), (subject, 5), (outline, 3),
(tags, 3), (username, 2)) if item]
return suggests

下载主讲人头像

小程序开发工具中不能直接使用知乎的图片资源,所以我只能下载下来并生成一个本地的图片地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

import os

IMAGE_FOLDER = 'static/images/zhihu'
if not os.path.exists(IMAGE_FOLDER):
os.mkdir(IMAGE_FOLDER)

class Crawler:
...
async def convert_local_image(self, pic):
pic_name = pic.split('/')[-1]
path = os.path.join(IMAGE_FOLDER, pic_name)
if not os.path.exists(path):
async with self.session.get(pic) as resp:
content = await resp.read()
with open(path, 'wb') as f:
f.write(content)
return path

async def parse_live_link(self, response):
...
for live in rs['data']:
...
speaker = live.pop('speaker')
speaker_id = speaker['member']['id']
speaker['member']['avatar_url'] = await self.convert_local_image( # noqa
speaker['member']['avatar_url'])
...

这样User类中的avatar_url最后会变成static/images/zhihu/v2-4db301967fffa08dfa727ff467170e_s.jpg这样的地址了。未来我们将让sanic来提供静态资源服务。当然,也可以只存文件名,在接口返回前再去拼图片地址。

抓取专栏信息

知乎Live申请通过之后,主讲人可以写一篇专栏介绍Live,文章中带上Live的链接来导流,知乎Live官方也会收录这个Live的专栏文章。为了让微信小程序的效果更好,我想要抓专栏头图,并且保存专栏链接,希望在小城中能给跳转进去(以证明不可行)。下面我将遍历知乎Live官方专栏收录的专栏,解析每个专栏的标题,去ES里面匹配是不是有对应的subject匹配,如果匹配还会验证用户的hash值确保正确,如果没找到还会从Live正文中搜索live的链接的方式来匹配。
看起来很绕,但是没办法啦,因为专栏和live没有明确的匹配关系,目测是知乎2个部门写的不同的系统。
最后要提的是专栏的抓取和live的api不同,它不提供paging键,也就是返回内容中并不告诉你下一页的地址,所以需要我们人工的修改链接,这需要一个转化的函数:

1
2
3
4
5
6
7
8
9

from urllib.parse import urlparse, parse_qsl, urlunparse, urlencode

def get_next_url(url):
url_parts = list(urlparse(url))
query = dict(parse_qsl(url_parts[4]))
query['offset'] = int(query['offset']) + int(query['limit'])
url_parts[4] = urlencode(query)
return urlunparse(url_parts)

这个方法在我实际工作中很常用:

1
2
3
4
5
6

In : get_next_url('http://dongwm.com?offset=10&limit=20')
Out: 'http://dongwm.com?offset=30&limit=20'

In : get_next_url('http://dongwm.com?offset=20&limit=30')
Out: 'http://dongwm.com?offset=50&limit=30'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

ZHUANLAN_API_URL = 'https://zhuanlan.zhihu.com/api/columns/zhihulive/posts?limit=20&offset={offset}'
LIVE_REGEX = re.compile(r'<a href="https://(www.)?zhihu.com/lives/(\d+)(.*)?"') # noqa


class Crawler:
def __init__(self, max_redirect=10, max_tries=4,
max_tasks=10, *, loop=None):
...
self.seen_zhuanlan = set()
...
async def parse_zhuanlan_link(self, response):
posts = await response.json()

if response.status == 200 and posts:
for post in posts:
cover = post['titleImage']
if not cover:
continue
s = Live.search()
title = post['title']
for sep in ('-', '—'):
if sep in title:
title = title.split(sep)[-1].strip()
speaker_id = post['author']['hash']
zid = post['url'].split('/')[-1]
s = s.query(Q('match_phrase', subject=title))
lives = await s.execute()
for live in lives:
if live.speaker and live.speaker.speaker_id == speaker_id:
await self.update_live(zid, cover, live)
break
else:
match = LIVE_REGEX.search(post['content'])
if match:
live_id = match.group(2)
try:
live = await Live.get(live_id)
except NotFoundError:
pass
else:
await self.update_live(zid, cover, live)

return get_next_url(response.url)

async def update_live(self, zid, cover, live):
if live.id in self.seen_zhuanlan:
return
self.seen_zhuanlan.add(live.id)
zhuanlan_url = ZHUANLAN_URL.format(zid)
cover = await self.convert_local_image(cover)
await live.update(cover=cover, zhuanlan_url=zhuanlan_url)

def add_zhuanlan_urls(self):
for offset in range(self.max_tasks):
self.add_url(ZHUANLAN_API_URL.format(offset=offset * 20))

async def crawl(self):
self.__workers = [asyncio.Task(self.work(), loop=self.loop)
for _ in range(self.max_tasks)]
self.t0 = time.time()
await self.q.join()
self.add_zhuanlan_urls()
await self.q.join()
self.t1 = time.time()
for w in self.__workers:
w.cancel()

其中crawl方法中用2次join用来确保 先抓取全部live信息之后再去抓专栏信息
,因为得先确保live内容足够完整才能搜索匹配,其次由于parse_live_link和parse_zhuanlan_link都涉及到Live的更新,在并发中容易造成同时更新某些live而触发版本冲突的ConflictError。
我使用s = s.query(Q('match_phrase', subject=title))进行标题匹配,首先我们先聊聊在ES中match和term的区别, 简单的说:

term用于精确查询,match用于全文检索
我们要把标题和Live的subject字段去匹配,但是由于subject设置了analyzer,所以无法使用term。除非新加一个字段,修改成类似cover的那种Text(index='not_analyzed')。但是这样新增字段实在有点浪费,用math会把要匹配的标题分词之后挨个去匹配,匹配其中一个或多个的文档就会被搜索出来,
显然不满足「精确」,所以我想到了「短语匹配」(Phrase Matching)。
短语匹配和match查询类似,match_phrase查询首先解析查询字符串产生一个词条列表。但只保留含有了所有搜索词条的文档,并且还要求这些词条的顺序也一致。就是相当于虽然分词了但是词的顺序是有要求的,效果类似于精确匹配。

支持自动刷新知乎的token

在调用知乎API的时候,会经常告诉我token过期了。我得删掉原来的token.json然后重新生成,这样很不爽。所以抓包分析后,通过已有的refresh_token自动刷新token:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

class ZhihuClient:
def refresh_token(self):
data = LOGIN_DATA.copy()
data['grant_type'] = 'refresh_token'
data['refresh_token'] = self._token.refresh_token
gen_signature(data)
auth = ZhihuOAuth(self._token)
self.save_token(auth, data)

def save_token(self, auth, data):
res = self._session.post(LOGIN_URL, auth=auth, data=data)
try:
json_dict = res.json()
if 'error' in json_dict:
raise LoginException(json_dict['error']['message'])
self._token = ZhihuToken.from_dict(json_dict)
except (ValueError, KeyError) as e:
raise LoginException(str(e))
else:
ZhihuToken.save_file(self.token_file, json_dict)

启动爬虫首先会去get一个url,看看返回的状态码是不是401,如果是就执行refresh_token方法获得新的token:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

class Crawler:
...
async def check_token(self):
async with self.session.get(
LIVE_API_URL.format(type='ended', offset=0)) as resp:
if resp.status == 401:
self.client.refresh_token()

async def crawl(self):
await self.check_token()
...

PS: 今天试用好像不对

更新用户举办的Live数量

之前我们给User添加了incr_live_count这个方法,调用一次live_count就+1,由于这个爬虫每次都是重新过一遍,所以需要仅在创建live的时候才更新:

1
2
3
4
5
6

async def parse_live_link(self, response):
...
result = await Live.add(**live_dict)
if result.meta['version'] == 1:
user.incr_live_count()

ES每次每次都会返回添加/更新的结果,其中的version字段正好被我们利用。

优化抓取

终于到最后一个小节了。再次道歉,之前分享的爬虫其中有一句检查要抓取的url是不是在self.seen_uls里面的判断,如果已经抓取过就assert抛了异常,这其实造成最后就剩下一个协程在执行任务了。
现在我重构了这部分的内容,大家看代码体会吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

class Crawler:
def __init__(self, max_redirect=10, max_tries=4,
max_tasks=10, *, loop=None):
self.__stopped = {}.fromkeys(['ended', 'ongoing', 'posts'], False)

async def fetch(self, url, max_redirect):
...
if next_url is not None:
self.add_url(next_url, max_redirect)
else:
# 如果live或者知乎官方专栏接口不再返回下一页,这个类型的任务就停止
for type in self.__stopped:
if type in url:
self.__stopped[type] = True

async def work(self):
try:
while 1:
url, max_redirect = await self.q.get()
if url in self.seen_urls:
type = url.split('/')[-1].split('?')[0]
# 如果一个接口返回的next_url已经抓去过,自动添加next_url的下一页
# 除非这个类型的任务停止状态为True
if not type.isdigit() and not self.__stopped[type]:
self.add_url(get_next_url(url), max_redirect)
await self.fetch(url, max_redirect)
self.q.task_done()
asyncio.sleep(1)
except asyncio.CancelledError:
pass

这样就既不会重复抓取,也能保证worker都能正常工作。
截止发稿,抓取到的Live 1967个, 话题 656 个
完整抓取一次大概调用约950次API(1967 / 10 + 1967 / 20 + 656), 在我MacBook上耗时 70 - 90 s。

版权声明:本文由 董伟明 原创,未经作者授权禁止任何微信公众号和向掘金(juejin.im)转载,技术博客转载采用 保留署名-非商业性使用-禁止演绎 4.0-国际许可协议
python