************************** celery-beat ************************** 依赖的第三方库 * `celery`_ * `django-celery-beat`_ * `doc `_ .. _celery: https://github.com/celery/celery .. _django-celery-beat: https://github.com/celery/django-celery-beat .. warning:: 官方不支持在win10运行,因此需要使用变通的 `方法`_ 。`相关问题`_ 。 .. warning:: 一个beat对应一个库,如果多个beat使用同一个库,会重复下发任务 .. _方法: https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows .. _相关问题: https://www.zhihu.com/question/63439561 .. toctree:: :maxdepth: 2 :caption: 相关阅读 架构和基本使用 使用Celery常见的7个问题 celery在django的使用 celery入门 django-celery-beat 实现了celery的django项目 celery之broadcast queue 高性能分布式任务队列Celery功能探究 *启动程序* 在manage.py目录下启动django后,继续执行: .. code-block:: shell celery -A [project-name] worker --loglevel=info .. note:: Windows环境加上 `-P gevent` .. code-block:: shell celery -A [project-name] beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler 代码创建任务:: >>> PeriodicTask.objects.create( ... interval=schedule, # we created this above. ... name='Importing contacts', # simply describes this periodic task. ... task='proj.tasks.import_contacts', # name of task. ... ) >>> import json >>> from datetime import datetime, timedelta >>> PeriodicTask.objects.create( ... interval=schedule, # we created this above. ... name='Importing contacts', # simply describes this periodic task. ... task='proj.tasks.import_contacts', # name of task. ... args=json.dumps(['arg1', 'arg2']), ... kwargs=json.dumps({ ... 'be_careful': True, ... }), ... expires=datetime.utcnow() + timedelta(seconds=30) ... ) 本地化 =============================================== https://docs.djangoproject.com/zh-hans/4.0/topics/i18n/translation/#how-to-create-language-files admin的周期任务app默认英文,不过celery-beat提供了.po文件来汉化,在django-celery-beat目录下执行命令行: django-admin makemessages 重启django即可汉化成功。 改造给运维人员使用 =============================================== celery beat提供的Django Admin接口配置调用函数时,传入的参数需要写json格式,这对于普通的运维人员不够友好,需要 改造以简化操作。 1. 新增定时任务的代理模型 假设命名位AppPeriodicTask * 继承celery beat的PeriodicTask模型 * 将Meta 类的 proxy 属性设置为 True .. seealso:: 代理模型跟继承模型操作同一张表,只是可以修改一些行为。 2. 新增AppPeriodicTask对应的表单类 * 根据实际情况增加参数对应的表单控件 * 重写clean方法,校验新增的控件数据,并update到self.cleaned_data属性, self.clean_data["kwrags"] = json.dumps(${新增的控件数据}), self.clean['task']写死一个函数 * 重写save方法,self.instance.task = self.cleaned_data['task'], self.instance.kwargs = self.cleaned_data['kwargs'] 3. 新增AppPeriodicTask对应的Admin类 * form = AppPeriodicTaskForm * 重写fieldsets属性 至此,页面就改造完成了。 -------------------------------------------------- task里面能否调用django的模块? ======================================= 可以, worker启动时就是指定django app启动的。 beat如何动态生成task的入参数据? ======================================= .. code-block:: python from celery.beat import BeatLazyFunc beat_schedule = { 'test-every-5-minutes': { 'task': 'test', 'schedule': 300, 'kwargs': { "current": BeatCallBack(datetime.datetime.now) } } } 集成gRPC ====================================== 众所周知, grpc的channel不能跨进程使用, 而worker启动时, 会从worker主进程启动n个worker子进程, 然后分别在子进程 执行task任务。所以在celery作为客户端长连接grpc服务端时,channel应该在worker_process_init信号中连接,即分别在n个子进程 创建和连接channel。 不要在worker_init中定义, 会导致只有1个进程能启动工作,其它进程报错退出。 从代码启动worker进程 ======================================= https://docs.celeryq.dev/en/latest/userguide/application.html#main-name chain应用 ======================================= 知乎相关问题: https://www.zhihu.com/question/286483438 单元测试 ======================================= celery有提供pytest的插件协助完成celery的单元测试 ``pytest_plugins = ('celery.contrib.pytest', )`` https://docs.celeryq.dev/en/latest/userguide/testing.html#celery-app-celery-app-used-for-testing 有BUG,有机会参与开源贡献! https://github.com/celery/celery/issues/7750 我认为应该改进文档提供的样例代码:: def test_create_task(celery_worker): @celery_worker.app.task def mul(x, y): return x * y assert mul.delay(4, 4).get(timeout=10) == 16 开源贡献 ======================================= 在node2机器上已clone 设置消息过期时间 ======================================= 参考资料: * https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers * 第三方资料: http://wjhsh.net/pywjh-p-14793140.html * https://stackoverflow.com/questions/26990438/how-to-set-per-message-expiration-ttl-in-celery 第一种方法: 设置expiration属性, 例如下面设置42秒 .. code-block:: python my_awesome_task.apply_async(args=(11,), expiration=42) 第二种方法: 定义celery task时传入参数 .. code-block:: python @shared_task(expires=20) def import_contacts(): """expires: 20秒过期时间""" from datetime import datetime print("Now: " + str(datetime.now())) print("Do: import contacts") 除此之外,还可以定义队列属性设置过期时间,以后有空再研究。 常见疑问 =========================== 数据库重启后, celery beat能否正常工作 ----------------------------------- 经实验, 不影响。当停止mysql数据库时, beat的控制台输出: .. code-block:: text django.db.utils.OperationalError: (2013, "Lost connection to server at 'handshake: reading initial communication packet', system error: 0") django.db.utils.OperationalError: (2002, "Can't connect to server on '127.0.0.1' (10061)") 重新启动mysql后,beat恢复发送任务