https://gitee.com/bitterteaer/celery-learn.git
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
基本使用
celery_task
1 2 3 4 5 6 7 8 9 10 11
| import celery import time backend='redis://127.0.0.1:6379/1' broker='redis://127.0.0.1:6379/2' cel=celery.Celery('test',backend=backend,broker=broker) @cel.task def send_email(name): print("向%s发送邮件..."%name) time.sleep(5) print("向%s发送邮件完成"%name) return "ok"
|
注意,异步任务文件命令执行:
celery -A celery_task worker -l info -P eventlet
!windows10运行celery4.x以上,就会出现这个问题
解决办法
安装一个eventlet模块
pip3 install eventlet -i https://pypi.douban.com/simple/
然后启动celery的时候加一个参数
celery -A <mymodule> worker -l info -P eventlet
创建执行任务文件 produce_task.py
1 2 3 4 5
| from celery_task import send_email result = send_email.delay("yuan") print(result.id) result2 = send_email.delay("alex") print(result2.id)
|
创建py文件:result.py,查看任务执行结果,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from celery.result import AsyncResult from celery_task import cel
async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)
if async_result.successful(): result = async_result.get() print(result) elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from celery import Celery
cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_tasks.task01', 'celery_tasks.task02' ])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False
|
定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| from celery_task import send_email from datetime import datetime
ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay
result = send_email.apply_async(args=["egon"], eta=task_time) print(result.id)
|
多任务结构中celery.py修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| from datetime import timedelta from celery import Celery from celery.schedules import crontab
cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_tasks.task01', 'celery_tasks.task02', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = False
cel.conf.beat_schedule = { 'add-every-10-seconds': { 'task': 'celery_tasks.task01.send_email', 'schedule': timedelta(seconds=6), 'args': ('张三',) }, }
|
启动 Beat 程序
Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
之后启动 worker 进程
1
| celery -A proj worker -l info
|
或者
1
| celery -B -A proj worker -l info
|