知乎live全文搜索之构建基于asyncio+sanic的RESTful-API服务

今天的文章图片就是这个知乎Live全文搜索的效果了。如果在wifi情况下或者土豪不介意流量的同学可以直接[感受实际使用的动态效果](https://github.com/dongweiming/weapp-
zhihulive/blob/master/screenshot/zhihulive.gif) 。
在我的《Python Web开发实战》一书中,比较少的介绍到豆瓣自己造的轮子。有读者喷「不够实战」。我显然承认,也很苦闷
,今天顺便来吐个槽。我刚工作的时候特别愿意混各种技术会议和活动。经常有一些专家在上面讲:我们自己实现了一个XXX,它有如下多的特性,现在支持了多少个产品线的多少个应用,每天的数据量YY
PB,流量ZZ…
通篇在讲架构,摆几张高大上的图,甚至能说几个大家不了解的新的玩法都很少,还不断的问坐在下面的领导或者法务:额,这个我能说嘛;那个我能分享么?最最重要的是,他们讲的这些东西大多不是开源的….
也基本没有一个可以拿得出手的论文,甚至说白了,如果你正好专注这一部分,会发现它也是根据了FLAG公司的论文在造轮子罢了,说不定造的还不如你。
听过之后,也没有收获,都是「别人家」的,甚至有种炫耀的感觉而已,至于有什么苦可能只有他们自己知道吧。我就越来越不愿意参加这种会议了。
写书或者博客也是这样。一切东西脱离了公司能提供的基础设施和环境都是空谈,但是这些铺垫说起来就太大了,先不讨论公司有没有授权你在外面说,就是没开源这点就不好弄。说的人都是在虚化的讲一大坨的东西,最多来几个截图之类的(应该还打了马赛克)。等这一大坨东西说的让大家明白了,一本书的厚度肯定不够。但是这些内容呢,只是你为了写书的某一(几)章做铺垫而已,有些内容太专太偏,读者大部分场景下是用不到的,好吧,又得骂娘说你这本书不实用…
今天我将基于我过往的实践,以及最近学习asyncio和ES知识完成一个小程序的API服务。看这篇文章前推荐阅读相关的如下文章:

  1. 知乎Live全文搜索之模型设计和爬虫实现
  2. 知乎Live全文搜索之模型接口
  3. 使用Python进行并发编程-我为什么不喜欢Gevent
  4. 使用Python进行并发编程-asyncio篇(一)
  5. 使用Python进行并发编程-asyncio篇(二)
  6. 使用Python进行并发编程-asyncio篇(三)
  7. 知乎Live全文搜索之让elasticsearch_dsl支持asyncio

    技术选型

  8. Sanic,基于Python 3.5+的异步web服务器,和Flask一样使用装饰器作为路由,支持Blueprint。效果确实非常快。
  9. uvloop,Sanic默认使用uvloop,这个实现基于libuv,比asyncio默认的loop的块很多。
  10. marshmallow,一个轻量级的转化复杂对象成为Python自带数据类型的库,为什么用它未来会详细介绍。

    使用Schema

    了解过关系型数据库如MySQL的同学会比较熟悉Schema这个词,它是对数据库的结构描述。Schema定义了表与表和表与字段之间的关系。在使用关系型数据库之前第一件事要先定Schema,创建表(库)再去操作。
    为什么我们在实现RESTful API的时候要考虑使用Schema呢?
  11. 首先API服务是给外部用的,比如移动端(安卓、IOS等),前端(用AJAX)等。那么大家一开始就要协商好那些字段,以及字段的类型。因为你不关系,他们都是关心的,如果设计有问题会由于没有正确处理而造成移动端闪退等严重问题。
  12. 你需要对API返回的数据进行良好的管理和验证。
    marshmallow把一组数据映射成一个类:
1
2
3
4
5
6
7
8

from marshmallow import Schema, fields

class UserSchema(Schema):
id = fields.Integer()
url = fields.Str()
name = fields.Str()
...

这样既让不同编程语言的开发者一目了然,也能检验你提供的数据是不是按照这个定好的结构返回的。

同一组数据可以定义多种Schema

思考一下,在搜索页面,每一项提供的空间有限,你无法把Live的全部信息(比如「描述」这种很长的内容的字段)都展示出来,也就是就算都返回了,其实只用了一部分字段,这造成了更多的网络延时和带宽消耗。但是在Live详情页理论上就可以展示全部的字段的内容了。
再假设下,如果是返回一部分,还是返回全部字段在后端做,就是不同的方法返回时对to_dict方法对一对if/elif/else的处理,其实看起来很乱。我是这样用的:

1
2
3
4
5
6
7
8
9
10
11
12

class UserSchema(Schema):
id = fields.Integer()
url = fields.Str()
name = fields.Str()
...


class UserFullSchema(UserSchema):
lives_url = fields.Str()
speaker_id = fields.Str()
...

也就是定义了多种User的schema,按需选择。但是后端统一使用to_dict方法返回全部数据,在视图渲染的时候进行筛选。这样需要用一种好的表达方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

from views.utils import marshal_with

@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async def search(request):
...


@bp.route('/suggest')
@marshal_with(LiveSchema)
async def suggest(request):
...


@bp.route('/user/<user_id>')
@marshal_with([LiveFullSchema, UserFullSchema])
async def user(request, user_id):
...

我们先不考虑视图内的逻辑,简单的理解成他们是单个live的to_dict结果或者多个live的to_dict结果的列表
通过神奇的marshal_with装饰器传入你希望返回符合那种Schema的数据。
现在揭晓一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

def marshal(data, fields):
schemas = [field() for field in fields]
if isinstance(data, (list, tuple)):
return [marshal(d, fields) for d in data]

type = data.get('type')
for schema in schemas:
if type in schema.__class__.__name__.lower():
result, errors = schema.dump(data)
if errors:
for item in errors.items():
print('{}: {}'.format(*item))
return result


class marshal_with(object):
def __init__(self, fields):
if not isinstance(fields, list):
fields = [fields]
self.fields = fields

def __call__(self, f):
@wraps(f)
async def wrapper(*args, **kwargs):
resp = await f(*args, **kwargs)
return marshal(resp, self.fields)
return wrapper

这个是async版本的,大家可以自己发散成Python 2的普通版。记得之前我们在model里面给每种数据的to_dict方法里面加了个{'type': 'live'}这种键值么,除了在小程序里面分辨数据的类型,在这里也是用来匹配那种schema的。举个例子,假如是一个user类型的数据。
@marshal_with([LiveFullSchema, UserFullSchema])的装饰下,由于UserFullSchema类名包含了live所以符合了。这比较黑科技一点..

深入使用marshmallow

marshmallow除了生成一个可读性很好的类和验证该字段是不是类型符合以外,还支持序列化和反序列化的处理。有什么意义呢。假如如下的schema:

1
2
3

class LiveSchema(Schema):
starts_at = fields.Date()

注意我们model存的starts_at是一个datetime类型的对象,无法被json序列化,所以返回之前应该先转化成字符串:

1
2
3
4
5
6

class LiveSchema(Schema):
starts_at = fields.Method('get_start_time')

def get_start_time(self, obj):
return int(obj['starts_at'].strftime('%s'))

再举个例子:

1
2
3
4
5

class UserSchema(Schema):
bio = fields.Str()
headline = fields.Str()
description = fields.Str()

bio/headline/description这三个字段内容都有可能比较长,需要做不同的截取:在详情页显示全部,在搜索页之前显示前2行..
我们需要一个truncate函数:

1
2
3
4
5

WIDTH = 45

def truncate_utf8(str, width=WIDTH):
return str[:width] + '...' if len(str) > width else str

假如使用fields.Method你就要写三方法,因为它指定的方法只有self, obj2个参数,而且Schema是不能继承的。这么办呢?有我的书中介绍的partialmethod可完美实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

from functools import partialmethod

class Item(object):
def truncate(self, attr, obj):
if attr not in obj:
return ''
return truncate_utf8(obj[attr], WIDTH)


class UserSchema(Schema):
bio = fields.Method('truncate_bio')
headline = fields.Method('truncate_headline')
description = fields.Method('truncate_description')
truncate_headline = partialmethod(Item.truncate, 'headline')
truncate_bio = partialmethod(Item.truncate, 'bio')
truncate_description = partialmethod(Item.truncate, 'description')

通过partialmethod + 非继承至Schema的类就可以实现继承和额外参数了。

对Sanic定制

我们都知道,当你有独特的需求而框架不满足的时候,就要对其进行二次开发或者封装。我一般倾向基于框架提供的灵活性去封装。由于在多个API上都有分页的需要,参数中会出现start/limit(当然你可以更喜欢其他的词汇)。如果不定制,那么在每个视图里面都要加一句:

1
2
3
4
5
6
7

@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async def search(request):
start = request.args.get('start', 0)
limit = request.args.get('limit', 10)
...

这2句就是个累赘。怎么做呢? 利用sanic提供的中间件就好了:

1
2
3
4
5

@app.middleware('request')
async def halt_request(request):
request.start = request.args.get('start', 0)
request.limit = request.args.get('limit', 10)

但是这还不够,因为sanic在创建Request的时候基于内存的考虑使用了slots,所以我们要重新写on_headers_complete方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

from sanic.server import HttpProtocol, CIMultiDict
from sanic.request import Request as _Request


class Request(_Request):
__slots__ = (
'url', 'headers', 'version', 'method', '_cookies',
'query_string', 'body', 'start', 'limit',
'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files',
)


class JSONHttpProtocol(HttpProtocol):
def on_headers_complete(self):
remote_addr = self.transport.get_extra_info('peername')
if remote_addr:
self.headers.append(('Remote-Addr', '%s:%s' % remote_addr))

self.request = Request(
url_bytes=self.url,
headers=CIMultiDict(self.headers),
version=self.parser.get_http_version(),
method=self.parser.get_method().decode()
)

if __name__ == '__main__':
app.run(host='0.0.0.0', port=8300, protocol=JSONHttpProtocol,
workers=4, debug=True)

这样在视图中就可以直接使用request.start和request.limit了。
其次看官方用法,都是在视图中控制返回的内容的类型,比如:

1
2
3
4
5
6

from sanic.response import json

@app.route('/')
async def test(request):
return json({"hello": "world"})

我也希望封装全部的返回结果的格式为:

1
2

{'rs': data}

PS: 当然生产环境中应该还有一个error字段甚至error_code字段标识如果出错的信息和类型等字段,我这里作为演示就保留了一个rs字段
这个data就是实际的视图返回的结果,但是在响应的时候已经封好了。可以继续重写write_response方法:

1
2
3
4
5
6
7
8
9
10
11

class JSONHttpProtocol(HttpProtocol):
def write_response(self, response):
if isinstance(response, str):
response = text(response)
elif isinstance(response, (list, dict)):
response = {'rs': response}
if isinstance(response, dict):
response = json(response)

return super().write_response(response)

这样返回的结果就很统一了。
至此,一个异步的、风格明确的、功能满足需要的API服务就完成了。
项目具体代码可见: https://github.com/dongweiming/weapp-zhihulive

版权声明:本文由 董伟明 原创,未经作者授权禁止任何微信公众号和向掘金(juejin.im)转载,技术博客转载采用 保留署名-非商业性使用-禁止演绎 4.0-国际许可协议
python