来源: - https://blog.csdn.net/jclian91/article/details/86749032 - https://blog.csdn.net/jclian91/article/details/107445101
一个非常详细的文档:https://homholueng.github.io/2019/02/09/celery-introduction/
1. celery 简介
celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它的执行单元为任务(task),利用多线程,如Eventlet,gevent等,它们能被并发地执行在单个或多个职程服务器(worker servers)上。任务能异步执行(后台运行)或同步执行(等待任务完成)。
组件介绍
- Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Celery Beat:任务调度器,Beat进程会读取配置文件的内容,
周期性地
将配置中到期需要执行的任务发送给任务队列。 - Broker:消息代理,又称消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作为消息代理,但适用于生产环境的只有
RabbitMQ
和Redis
, 官方推荐 RabbitMQ。 - Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
- Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
发布任务的时候会将任务进行序列化,支持的序列化组件
- pickle
- json
- yaml
- msgpack
2. celery 项目文件
一个 celery 项目需要包括3个部分:
- producer - 用于产生任务,并将任务添加到 broker 的任务队列中。
- broker - 用于进行任务队列的存储和分发。 producer 发布任务,worker 拉取任务。
- worker - 从 broker 拉取任务,并执行任务。然后将执行的结果存储到 backend 里面。如果不产生结果,那么就可以不设置 backend。
其中 producer 可以是其他程序将 celery 项目 import 进入后,调用 celery 的 task API来发布任务,也可以使用 beat 进行定时发布任务,类似 crontab。也就是说 produer 的角色有两个变种:1. task API,2. beat 定时任务
3. celery 的服务实例
3.1 celery 作为 lib 库
例如有项目 xxx 需要将任务进行分发,那么可以 import cxxx,然后使用里面的 task API 直接发布任务。任务会被发布到 redis 的任务队列中。
然后 worker 端需要启动服务,主动从任务队列拉取任务,然后进行处理。处理完成后会自动将结果 push 到 bakcend 中。
这种方式有2个角色: 1. 任务产生方进程 1. worker 进程
3.2 celery 作为定时任务
如果有项目 yyy 需要定时进行任务处理,那么需要启动一个 celery beat 服务,用来定时产生任务,任务也会自动发布到 redis 的任务队列中。
然后 worker 段需要启动服务,主动从任务队列拉取任务,然后进行处理。处理完成后会自动将结果 push 到 backend 中。
这种方式有2个角色: 1. beat 定时任务产生服务 1. worker 服务
4. celery 实例
4.1 项目的目录结构
$ ls auth
beat-entrypoint.sh # celery -A proj.main beat -l debug
worker-entrypoint.sh # celery -A proj.main worker -l debug
proj/
celeryconfig.py
main.py
tasks.py
主要的项目代码在 auth/proj/
目录中,其中三个文件如下:celeryconfig.py
、main.py
、tasks.py
celeryconfig.py
配置信息main.py
主流程代码,和flask
的app
差不多。其中的定时器用来定制产生任务。tasks.py
任务的工作代码
4.2 项目的代码
celeryconfig.py
BROKER_URL = 'redis://172.17.0.1:26790/1' # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = None #'redis://172.17.0.1:26790/2' # 把任务结果存在了Redis
CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
CELERYD_CONCURRENCY = 10 # 并发worker数
main.py
from celery import Celery
from celery.schedules import crontab
app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')
app.conf.beat_schedule = {
'add-every-5-minutes': {
'task': 'proj.tasks.push_token',
'schedule': crontab(minute="*/5"),
'args': (125,),
},
}
tasks.py
from proj.main import app
from loguru import logger
import utils
import redis
import time
import base64
import os
logger.add("/logs/task.log",
format='{time:YYYY-MM-DD HH:mm:ss.SSSSSS} - {message}',
rotation="16 MB", # 64M each file
retention=10 # keep 1 file
)
rds = redis.Redis.from_url(os.getenv("REDIS_URL", "redis://172.17.0.1:26790/1"))
def make_token(tm, salt, deviation=125):
at = tm - deviation
bt = tm
ct = tm + deviation
a5 = utils.md5sum(f"{at} {salt}")
b5 = utils.md5sum(f"{bt} {salt}")
c5 = utils.md5sum(f"{ct} {salt}")
return a5[:4] + a5[-4:], b5[:4] + b5[-4:], c5[:4] + c5[-4:]
@app.task
def push_token(deviation=125):
logger.debug("set time token")
time_unit = int(os.getenv("TIME_UNIT", 300))
now = int(time.time()) // time_unit
salt = "Make access token: #5133Cj)*#%!+#:}><XAMK/"
tokens = make_token(now, salt, deviation)
pos = -1
for token in tokens:
userpass = f"Mr.Big:{token}"
key = "file:auth:Basic " + base64.b64encode(userpass.encode()).decode()
rds.set(key, "ok", ex=60 * 5 + pos * 15) # 5 minutes
logger.debug(f"token: {token}, key: {key}")
pos += 1
return None
utils.py 是工具库,与主流程代码无关
import hashlib
def md5sum(s):
hash_md5 = hashlib.md5()
hash_md5.update(s.encode())
return hash_md5.hexdigest()
4.3 启动项目
- 需要先启动一个 redis 服务
- 启动 beat-entrypoint.sh 脚本,用来定时发布任务
- 启动 worker-entrypoint.sh 脚本,用来 pull 任务并完成