celery-beat
依赖的第三方库
警告
一个beat对应一个库,如果多个beat使用同一个库,会重复下发任务
相关阅读
启动程序
在manage.py目录下启动django后,继续执行:
celery -A [project-name] worker --loglevel=info
备注
Windows环境加上 -P gevent
celery -A [project-name] beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
代码创建任务:
>>> PeriodicTask.objects.create(
... interval=schedule, # we created this above.
... name='Importing contacts', # simply describes this periodic task.
... task='proj.tasks.import_contacts', # name of task.
... )
>>> import json
>>> from datetime import datetime, timedelta
>>> PeriodicTask.objects.create(
... interval=schedule, # we created this above.
... name='Importing contacts', # simply describes this periodic task.
... task='proj.tasks.import_contacts', # name of task.
... args=json.dumps(['arg1', 'arg2']),
... kwargs=json.dumps({
... 'be_careful': True,
... }),
... expires=datetime.utcnow() + timedelta(seconds=30)
... )
本地化
https://docs.djangoproject.com/zh-hans/4.0/topics/i18n/translation/#how-to-create-language-files
admin的周期任务app默认英文,不过celery-beat提供了.po文件来汉化,在django-celery-beat目录下执行命令行: django-admin makemessages
重启django即可汉化成功。
改造给运维人员使用
celery beat提供的Django Admin接口配置调用函数时,传入的参数需要写json格式,这对于普通的运维人员不够友好,需要 改造以简化操作。
新增定时任务的代理模型
假设命名位AppPeriodicTask
继承celery beat的PeriodicTask模型
将Meta 类的 proxy 属性设置为 True
参见
代理模型跟继承模型操作同一张表,只是可以修改一些行为。
新增AppPeriodicTask对应的表单类
根据实际情况增加参数对应的表单控件
重写clean方法,校验新增的控件数据,并update到self.cleaned_data属性, self.clean_data[“kwrags”] = json.dumps(${新增的控件数据}), self.clean[‘task’]写死一个函数
重写save方法,self.instance.task = self.cleaned_data[‘task’], self.instance.kwargs = self.cleaned_data[‘kwargs’]
新增AppPeriodicTask对应的Admin类
form = AppPeriodicTaskForm
重写fieldsets属性
至此,页面就改造完成了。
task里面能否调用django的模块?
可以, worker启动时就是指定django app启动的。
beat如何动态生成task的入参数据?
from celery.beat import BeatLazyFunc
beat_schedule = {
'test-every-5-minutes': {
'task': 'test',
'schedule': 300,
'kwargs': {
"current": BeatCallBack(datetime.datetime.now)
}
}
}
集成gRPC
众所周知, grpc的channel不能跨进程使用, 而worker启动时, 会从worker主进程启动n个worker子进程, 然后分别在子进程 执行task任务。所以在celery作为客户端长连接grpc服务端时,channel应该在worker_process_init信号中连接,即分别在n个子进程 创建和连接channel。 不要在worker_init中定义, 会导致只有1个进程能启动工作,其它进程报错退出。
从代码启动worker进程
https://docs.celeryq.dev/en/latest/userguide/application.html#main-name
chain应用
单元测试
celery有提供pytest的插件协助完成celery的单元测试
pytest_plugins = ('celery.contrib.pytest', )
https://docs.celeryq.dev/en/latest/userguide/testing.html#celery-app-celery-app-used-for-testing
有BUG,有机会参与开源贡献!
https://github.com/celery/celery/issues/7750
我认为应该改进文档提供的样例代码:
def test_create_task(celery_worker):
@celery_worker.app.task
def mul(x, y):
return x * y
assert mul.delay(4, 4).get(timeout=10) == 16
开源贡献
在node2机器上已clone
设置消息过期时间
参考资料:
https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers
https://stackoverflow.com/questions/26990438/how-to-set-per-message-expiration-ttl-in-celery
第一种方法:
设置expiration属性, 例如下面设置42秒
my_awesome_task.apply_async(args=(11,), expiration=42)
第二种方法:
定义celery task时传入参数
@shared_task(expires=20)
def import_contacts():
"""expires: 20秒过期时间"""
from datetime import datetime
print("Now: " + str(datetime.now()))
print("Do: import contacts")
除此之外,还可以定义队列属性设置过期时间,以后有空再研究。
常见疑问
数据库重启后, celery beat能否正常工作
经实验, 不影响。当停止mysql数据库时, beat的控制台输出:
django.db.utils.OperationalError: (2013, "Lost connection to server at 'handshake: reading initial communication packet', system error: 0")
django.db.utils.OperationalError: (2002, "Can't connect to server on '127.0.0.1' (10061)")
重新启动mysql后,beat恢复发送任务