wechat-admin:SSE

在上一篇[项目设计](http://www.dongwm.com/archives/wechat-
admin%EF%BC%9A%E9%A1%B9%E7%9B%AE%E8%AE%BE%E8%AE%A1%E7%AF%87/)中,我说到了SSE(Server-
Sent Events)是为了实现单方向的消息推送,今天介绍下实际的使用。
我直接用了现成的[Flask-SSE](https://github.com/singingwolfboy/flask-
sse),其实SSE实现的原理比较简单:

  1. 借用Redis的发布/订阅模式创建一个方法,方法内会调用pubsub.listen监听新的发布数据。
  2. 使用Flask提供的stream_with_context,不断的从上面的方法中获取数据。
    使用起来分2部分

    前端

    在前端页面添加一个函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

function eventSourceListener() {
let source = new EventSource(`${API_URL}/stream`);
let self = this;
source.addEventListener('login', function(event) {
let data = JSON.parse(event.data);
if (data.type == 'scan_qr_code') {
self.uuid = data.uuid;
self.qrCode = `data:image/png;base64,${data.extra}`;
} else if (data.type == 'confirm_login') {
self.sub_title = 'Scan successful';
self.sub_desc = 'Confirm login on mobile WeChat';
self.qrCode = data.extra;
} else if (data.type == 'logged_in') {
sessionStorage.setItem('user', JSON.stringify(data.user));
self.$router.push({ path: '/main' });
} else if (data.type == 'logged_out') {
sessionStorage.removeItem('user');
self.$router.push('/login');
}
}, false);

source.addEventListener('notification', function(event) {
let data = JSON.parse(event.data);
self.notificationCount = data.count;
}, false);

source.addEventListener('error', function(event) {
console.log("Failed to connect to event stream");
}, false);
}

这段代码放在一个自定义的Vue的插件里面,这样在所有页面上都要自动包含这部分代码了。source.addEventListener用来添加事件监听,它监听了3种类型的消息:

  1. login 登录,也就是在页面反映当前微信的登录状态(等待扫码/扫码完成等待确认/确认完成)。不同的消息会执行不同的操作,页面也会立刻渲染出最新的结果。
  2. notification 消息提醒,会有一个异步任务定期检查新入库的消息,有新的消息就是发布出来通知新消息数。
  3. error 内置的错误消息,当然这个加不加倒还好
    另外在登陆后执行sessionStorage.setItem('user', JSON.stringify(data.user));会设置浏览器的session,下次自动登录后右侧就显示头像了,这样能减少后端的负担,退出时removeItem方法再删掉。

    后端

    后端包含2部分,第一部分是用Flask实现上面说的${API_URL}/stream这个接口,这是一个长连接,消息就是从这里推送出去的。由于第一部分是阻塞的,我们需要异步的方式往这个阻塞进程里面推送数据,也就是开头说的,利用Redis的发布/订阅模式发布消息。比如通知调用起来是[这样的](https://github.com/dongweiming/wechat-
    admin/blob/master/wechat/tasks.py#L181):
1
2
3
4
5

from app import app as sse_api

with sse_api.app_context():
sse.publish({'count': count}, type='notification')

登陆过程要复杂一些,我之前说过在我fork的ItChat和wxpy分支里面添加了信号的支持,这个信号是需要「注册」的,也就是在import之前就要注册,效果要类似[这样](https://github.com/dongweiming/wechat-
admin/blob/fc625df25e1f03b90d6f6ca8bdae8507e91e12ec/libs/wx.py#L7):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

from itchat.signals import scan_qr_code, confirm_login, logged_out

def publish(uuid, **kw):
from app import app
with app.app_context():
params = {'uuid': uuid, 'extra': kw.pop('extra', None),
'type': kw.pop('type', None)}
params.update(kw)
sse.publish(params, type='login')


scan_qr_code.connect(publish)
confirm_login.connect(publish)
logged_out.connect(publish)

from wxpy import * # noqa

这里用了信号的connect方法。举个logged_out的例子,在ItChat里面,首先定义这个信号

1
2
3
4
5

from blinker import Namespace

_signals = Namespace()
logged_out = _signals.signal('logged-out')

需要在对应发信号的地方调用send方法

1
2

logged_out.send(self.uuid, type='logged_out')

另外有个坑儿,首次打开Web页面的是一个铺满div的gif图片,一开始设想的是在下载二维码图片之后,通过修改img的src属性指到这个图片,实际开发中发现,这个二维码图片被会更新不及时,会使用缓存的就图片所以发送信号的时候不使用图片HTTP地址,而是Data
URLs,这就需要把图片内容编码一下:

1
2
3

encoded = base64.b64encode(qrStorage.getvalue()).decode('ascii')
scan_qr_code.send(self.uuid, extra=encoded, type='scan_qr_code')

结语

这样借助Redis和Celery就实现了SSE的使用,下一节我将介绍Celery的使用。

版权声明:本文由 董伟明 原创,未经作者授权禁止任何微信公众号和向掘金(juejin.im)转载,技术博客转载采用 保留署名-非商业性使用-禁止演绎 4.0-国际许可协议
python