Celery是一个专注于实时处理和任务调度分布式任务队列。通过RabbitMQ、Redis、MongoDB等消息代理,把任务发给执行任务的Worker以达到异步执行。
我写的那本《Python
Web开发实战》的样章就是《使用Celery》,建议看下面内容之前先读一下这篇文章。
接下来的内容假设你已经对Celery有了一定的了解。对wechat-
admin项目来说,使用Celery要做如下事情:
- 更新项目数据库中的联系人、群聊和公众号等相关内容
- 监听wxpy进程,处理自动加群、接受消息、踢人以及各种插件功能等
- 自动重启上述的监听进程
- 发送新消息数量提醒
首先我们创建一个目录(wechat),专门用来存放celery任务相关的内容,目录下文件列表如下:
1 |
|
我们挨个看看
celeryconfig.py
看文件名字就知道了,这个是放配置的文件:
1 |
|
指定消息代理和执行结果都使用Redis,任务(消息)使用msgpack序列化,结果使用json序列化,任务结果保存时间24小时等
celery.py
主程序有点Flask的app.py的感觉:
1 |
|
这段代码有2点需要解释一下:
- 调用send_task会返回任务id,存在LISTENER_TASK_KEY里面用于未来重启时直接通过这个任务id
- 使用了Celery的信号系统,listener这个异步任务需要在worker启动之后就运行,使用worker_ready这个信号就可以。
tasks.py
tasks.py这个文件包含了很多业务逻辑,为了演示我省略部分代码。不过代码还是很长,所以我直接在对应行数的代码上加注释来解释了:
1 |
|
上一篇我说SSE的时候忘说了一点,就是更新消息提醒。在Web页面标记已读的时候,会POST到/readall接口,后端清空新通知数量。这是由于SSE的单向特点造成的,如果使用socketio(WebSocket)的话可以直接emit到后端,就不用HTTP这种方案了
版权声明:本文由 董伟明 原创,未经作者授权禁止任何微信公众号和向掘金(juejin.im)转载,技术博客转载采用 保留署名-非商业性使用-禁止演绎 4.0-国际许可协议
python