Celery
是一个很棒的 Python 分布式异步任务系统。它开箱即用,但有几次我需要对其进行自定义。具体来说,我希望能够根据新的
apply_sync
参数定义行为。此外,能够将状态传递给工作任务会很好。
首先,您可以子类化主
Celery
类以定义自定义
Task
类。
import socket
from celery import Celery, Task
from kombu.exceptions import InconsistencyError
class MyCelery(Celery):
""" Subclass of a Celery application class that uses a custom Task type """
task_cls = 'myapp.mymodule:MyTask'
在您的
Task
类中,您可以覆盖
apply_async
(也从
delay
调用)以及
__call__
,它环绕实际的任务主体。
import socket
from celery import Celery, Task
from kombu.exceptions import InconsistencyError
class MyCelery(Celery):
""" Subclass of a Celery application class that uses a custom Task type """
task_cls = 'myapp.mymodule:MyTask'
在此示例中,我向
apply_async
引入了一个可选的
safe
参数,它会捕获并忽略试图派生任务的特定异常。它还搭载 celery 任务标头以将自身传递给工作进程,在工作进程中它忽略任务本身抛出的任何异常。