python 任务队列 celery

创建日期: 2025-02-19 15:09 | 作者: 风波 | 浏览次数: 20 | 分类: Python

来源: - https://blog.csdn.net/jclian91/article/details/86749032 - https://blog.csdn.net/jclian91/article/details/107445101

一个非常详细的文档:https://homholueng.github.io/2019/02/09/celery-introduction/

1. celery 简介

celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它的执行单元为任务(task),利用多线程,如Eventlet,gevent等,它们能被并发地执行在单个或多个职程服务器(worker servers)上。任务能异步执行(后台运行)或同步执行(等待任务完成)。

组件介绍

发布任务的时候会将任务进行序列化,支持的序列化组件

2. celery 项目文件

一个 celery 项目需要包括3个部分:

其中 producer 可以是其他程序将 celery 项目 import 进入后,调用 celery 的 task API来发布任务,也可以使用 beat 进行定时发布任务,类似 crontab。也就是说 produer 的角色有两个变种:1. task API,2. beat 定时任务

3. celery 的服务实例

3.1 celery 作为 lib 库

例如有项目 xxx 需要将任务进行分发,那么可以 import cxxx,然后使用里面的 task API 直接发布任务。任务会被发布到 redis 的任务队列中。

然后 worker 端需要启动服务,主动从任务队列拉取任务,然后进行处理。处理完成后会自动将结果 push 到 bakcend 中。

这种方式有2个角色: 1. 任务产生方进程 1. worker 进程

3.2 celery 作为定时任务

如果有项目 yyy 需要定时进行任务处理,那么需要启动一个 celery beat 服务,用来定时产生任务,任务也会自动发布到 redis 的任务队列中。

然后 worker 段需要启动服务,主动从任务队列拉取任务,然后进行处理。处理完成后会自动将结果 push 到 backend 中。

这种方式有2个角色: 1. beat 定时任务产生服务 1. worker 服务

4. celery 实例

4.1 项目的目录结构

$ ls auth
beat-entrypoint.sh         # celery -A proj.main beat -l debug
worker-entrypoint.sh     # celery -A proj.main worker -l debug
proj/
  celeryconfig.py
  main.py
  tasks.py

主要的项目代码在 auth/proj/ 目录中,其中三个文件如下:celeryconfig.pymain.pytasks.py

4.2 项目的代码

celeryconfig.py

BROKER_URL = 'redis://172.17.0.1:26790/1'    # 使用Redis作为消息代理

CELERY_RESULT_BACKEND = None #'redis://172.17.0.1:26790/2'  # 把任务结果存在了Redis

CELERY_TASK_SERIALIZER = 'msgpack'  # 任务序列化和反序列化使用msgpack方案

CELERY_RESULT_SERIALIZER = 'json'   # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显

CELERY_ACCEPT_CONTENT = ['json', 'msgpack']     # 指定接受的内容类型

CELERYD_CONCURRENCY = 10    # 并发worker数

main.py

from celery import Celery
from celery.schedules import crontab


app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')


app.conf.beat_schedule = {
    'add-every-5-minutes': {
        'task': 'proj.tasks.push_token',
        'schedule': crontab(minute="*/5"),
        'args': (125,),
    },
}

tasks.py

from proj.main import app
from loguru import logger
import utils
import redis
import time
import base64
import os


logger.add("/logs/task.log",
        format='{time:YYYY-MM-DD HH:mm:ss.SSSSSS} - {message}',
        rotation="16 MB",  # 64M each file
        retention=10 # keep 1 file
        )


rds = redis.Redis.from_url(os.getenv("REDIS_URL", "redis://172.17.0.1:26790/1"))

def make_token(tm, salt, deviation=125):
    at = tm - deviation
    bt = tm
    ct = tm + deviation

    a5 = utils.md5sum(f"{at} {salt}")
    b5 = utils.md5sum(f"{bt} {salt}")
    c5 = utils.md5sum(f"{ct} {salt}")
    return a5[:4] + a5[-4:], b5[:4] + b5[-4:], c5[:4] + c5[-4:]


@app.task
def push_token(deviation=125):
    logger.debug("set time token")
    time_unit = int(os.getenv("TIME_UNIT", 300))
    now = int(time.time()) // time_unit
    salt = "Make access token: #5133Cj)*#%!+#:}><XAMK/"
    tokens = make_token(now, salt, deviation)
    pos = -1
    for token in tokens:
        userpass = f"Mr.Big:{token}"
        key = "file:auth:Basic " +  base64.b64encode(userpass.encode()).decode()
        rds.set(key, "ok", ex=60 * 5 + pos * 15) # 5 minutes
        logger.debug(f"token: {token}, key: {key}")
        pos += 1
    return None

utils.py 是工具库,与主流程代码无关

import hashlib


def md5sum(s):
    hash_md5 = hashlib.md5()
    hash_md5.update(s.encode())
    return hash_md5.hexdigest()

4.3 启动项目

  1. 需要先启动一个 redis 服务
  2. 启动 beat-entrypoint.sh 脚本,用来定时发布任务
  3. 启动 worker-entrypoint.sh 脚本,用来 pull 任务并完成
20 浏览
10 爬虫
0 评论