任务 — Python 文档
任务
任务是 Celery 应用程序的构建块。
任务是一个可以从任何可调用对象中创建的类。 它扮演双重角色,因为它定义了当任务被调用(发送消息)时会发生什么,以及当工作人员收到该消息时会发生什么。
每个任务类都有一个唯一的名称,这个名称在消息中被引用,因此工作人员可以找到正确的函数来执行。
任务消息不会从队列中删除,直到该消息被工作人员 确认 。 一个 worker 可以提前保留许多消息,即使 worker 被杀死——由于电源故障或其他一些原因——消息也会被重新传递给另一个 worker。
理想情况下,任务函数应该是 幂等的 :这意味着即使使用相同的参数多次调用该函数也不会造成意外影响。 由于工作人员无法检测您的任务是否是幂等的,因此默认行为是在消息执行之前提前确认消息,以便已启动的任务调用永远不会再次执行。
如果您的任务是幂等的,您可以设置 acks_late 选项,让工作人员在 后确认消息 任务返回。 另请参阅常见问题条目 我应该使用 retry 还是 acks_late?。
请注意,即使启用了 acks_late,如果执行任务的子进程终止(通过任务调用 sys.exit()
,或通过信号),worker 也会确认该消息。 这种行为是故意的,因为……
- 我们不想重新运行强制内核向进程发送 :sig:`SIGSEGV`(分段错误)或类似信号的任务。
- 我们假设系统管理员故意杀死任务不希望它自动重启。
- 分配过多内存的任务有触发内核 OOM 杀手的危险,同样的情况可能会再次发生。
- 重新交付时总是失败的任务可能会导致高频消息循环关闭系统。
如果您真的希望在这些场景中重新交付任务,您应该考虑启用 :setting:`task_reject_on_worker_lost` 设置。
警告
无限期阻塞的任务最终可能会阻止工作实例执行任何其他工作。
如果您的任务执行 I/O,请确保为这些操作添加超时,例如使用 :pypi:`requests` 库向 Web 请求添加超时:
connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
时间限制 方便确保所有任务及时返回,但时间限制事件实际上会强制终止进程,因此仅使用它们来检测您尚未使用手动超时的情况。
在以前的版本中,默认的 prefork 池调度程序对长时间运行的任务并不友好,因此如果您有运行几分钟/小时的任务,建议启用 的 -Ofair
命令行参数]芹菜工人。 但是,从 4.0 版开始,-Ofair 现在是默认的调度策略。 请参阅 预取限制 了解更多信息,以及将长期运行和短期运行的任务路由到专用工作线程的最佳性能(自动路由)。
如果您的工作人员挂起,请在提交问题之前调查正在运行的任务,因为挂起很可能是由一项或多项挂在网络操作上的任务引起的。
–
在本章中,您将了解有关定义任务的所有内容,这是 目录 :
基础知识
您可以使用 @task()
装饰器从任何可调用对象轻松创建任务:
from .models import User
@app.task
def create_user(username, password):
User.objects.create(username=username, password=password)
还有许多 选项 可以为任务设置,这些可以指定为装饰器的参数:
@app.task(serializer='json')
def create_user(username, password):
User.objects.create(username=username, password=password)
如何导入任务装饰器? 什么是“应用程序”?
任务装饰器在您的 @Celery
应用程序实例上可用,如果您不知道这是什么,请阅读 芹菜的第一步 。
如果您正在使用 Django(请参阅 使用 Django 的第一步),或者您是某个库的作者,那么您可能想要使用 @shared_task()
装饰器:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
多个装饰器
当将多个装饰器与任务装饰器结合使用时,您必须确保 task 装饰器最后应用(奇怪的是,在 Python 中这意味着它必须在列表中的第一个):
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
绑定任务
被绑定的任务意味着任务的第一个参数将始终是任务实例 (self
),就像 Python 绑定方法一样:
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
logger.info(self.request.id)
重试(使用 Task.retry()
)、访问有关当前任务请求的信息以及添加到自定义任务基类的任何附加功能都需要绑定任务。
任务继承
任务装饰器的 base
参数指定任务的基类:
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask)
def add(x, y):
raise KeyError()
姓名
每个任务都必须有一个唯一的名称。
如果没有提供明确的名称,任务装饰器将为您生成一个,该名称将基于 1) 定义任务的模块,以及 2) 任务函数的名称。
示例设置显式名称:
>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
... return x + y
>>> add.name
'sum-of-two-numbers'
最佳实践是使用模块名称作为命名空间,这样如果在另一个模块中定义了具有该名称的任务,则名称不会发生冲突。
>>> @app.task(name='tasks.add')
>>> def add(x, y):
... return x + y
您可以通过调查其 .name
属性来判断任务的名称:
>>> add.name
'tasks.add'
我们在此处指定的名称 (tasks.add
) 正是在名为 tasks.py
的模块中定义任务时自动生成的名称:
tasks.py
:
@app.task
def add(x, y):
return x + y
>>> from tasks import add
>>> add.name
'tasks.add'
更改自动命名行为
4.0 版中的新功能。
在某些情况下,默认自动命名不合适。 考虑在许多不同的模块中有许多任务:
project/
/__init__.py
/celery.py
/moduleA/
/__init__.py
/tasks.py
/moduleB/
/__init__.py
/tasks.py
使用默认的自动命名,每个任务都会有一个生成的名称,如 moduleA.tasks.taskA、moduleA.tasks.taskB、moduleB.tasks.test、等等。 您可能希望摆脱所有任务名称中的 tasks。 如上所述,您可以明确地为所有任务命名,或者您可以通过覆盖 @gen_task_name()
来更改自动命名行为。 继续这个例子,celery.py 可能包含:
from celery import Celery
class MyCelery(Celery):
def gen_task_name(self, name, module):
if module.endswith('.tasks'):
module = module[:-6]
return super().gen_task_name(name, module)
app = MyCelery('main')
因此,每个任务都会有一个名称,如 moduleA.taskA、moduleA.taskB 和 moduleB.test。
警告
确保您的 @gen_task_name()
是一个纯函数:这意味着对于相同的输入,它必须始终返回相同的输出。
任务请求
Task.request
包含与当前正在执行的任务相关的信息和状态。
该请求定义了以下属性:
- 身份证
- 正在执行的任务的唯一 ID。
- 团体
- 任务的 组 的唯一 ID,如果此任务是成员。
- 和弦
- 此任务所属的和弦的唯一 ID(如果任务是标题的一部分)。
- 相关性_id
- 用于重复数据删除等操作的自定义 ID。
- 参数
- 位置参数。
- 夸格斯
- 关键字参数。
- 起源
- 发送此任务的主机名。
- 重试
- 当前任务重试了多少次。 从 0 开始的整数。
- is_eager
- 如果任务在客户端本地执行,而不是由工作人员执行,则设置为
True
。 - 埃塔
- 任务的原始预计到达时间(如果有)。 这是 UTC 时间(取决于 :setting:`enable_utc` 设置)。
- 过期
- 任务的原始到期时间(如果有)。 这是 UTC 时间(取决于 :setting:`enable_utc` 设置)。
- 主机名
- 执行任务的工作实例的节点名称。
- 交货信息
- 附加消息传递信息。 这是一个包含用于交付此任务的交换和路由密钥的映射。 例如
Task.retry()
用于将任务重新发送到同一目标队列。 此字典中密钥的可用性取决于所使用的消息代理。 - 回复
- 将回复发送回的队列名称(例如与 RPC 结果后端一起使用)。
- 直接调用
- 如果任务不是由工作人员执行的,则此标志设置为 true。
- 时限
- 此任务的当前
(soft, hard)
时间限制的元组(如果有)。 - 回调
- 如果此任务成功返回,则要调用的签名列表。
- 错误回复
- 如果此任务失败,要调用的签名列表。
- 协调世界时
- 设置为 true 调用者已启用 UTC (:setting:`enable_utc`)。
3.1 版中的新功能。
- 标题
- 与此任务消息一起发送的消息头的映射(可能是
None
)。 - 回复
- 在哪里发送回复(队列名称)。
- 相关性_id
- 通常与任务 id 相同,经常在 amqp 中使用以跟踪回复的目的。
4.0 版中的新功能。
- root_id
- 此任务所属的工作流中第一个任务的唯一 ID(如果有)。
- parent_id
- 调用此任务的任务的唯一 ID(如果有)。
- 链条
- 形成链的反向任务列表(如果有)。 此列表中的最后一项将是当前任务的下一个任务。 如果使用任务协议的版本 1,则链任务将在
request.callbacks
中。
5.2 版中的新功能。
- 属性
- 与此任务消息一起接收的消息属性的映射(可能是
None
或{}
) - 替换任务嵌套
- 任务被替换的次数(如果有的话)。 (可能是
0
)
示例
在上下文中访问信息的示例任务是:
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
self.request))
bind
参数意味着该函数将是一个“绑定方法”,以便您可以访问任务类型实例上的属性和方法。
日志记录
工作人员将自动为您设置日志记录,或者您可以手动配置日志记录。
一个名为“celery.task”的特殊记录器可用,您可以从该记录器继承以自动获取任务名称和唯一 id 作为日志的一部分。
最佳实践是在模块顶部为所有任务创建一个通用记录器:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
Celery 使用标准的 Python 记录器库,文档可以在 here
中找到。
您也可以使用 print()
,因为写入标准 out/-err 的任何内容都将被重定向到日志系统(您可以禁用此功能,请参阅 :setting:`worker_redirect_stdouts`)。
笔记
如果您在任务或任务模块中的某处创建记录器实例,则工作器不会更新重定向。
如果要将 sys.stdout
和 sys.stderr
重定向到自定义记录器,则必须手动启用它,例如:
import sys
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
old_outs = sys.stdout, sys.stderr
rlevel = self.app.conf.worker_redirect_stdouts_level
try:
self.app.log.redirect_stdouts_to_logger(logger, rlevel)
print('Adding {0} + {1}'.format(x, y))
return x + y
finally:
sys.stdout, sys.stderr = old_outs
笔记
如果您需要的特定 Celery 记录器没有发出日志,您应该检查记录器是否正确传播。 在此示例中,启用了“celery.app.trace”,以便发出“succeeded in”日志:
import celery
import logging
@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
logger = logging.getLogger('celery')
logger.propagate = True
logger = logging.getLogger('celery.app.trace')
logger.propagate = True
笔记
如果要完全禁用 Celery 日志记录配置,请使用 :signal:`setup_logging` 信号:
import celery
@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
pass
参数检查
4.0 版中的新功能。
Celery 会在你调用任务时验证传递的参数,就像 Python 在调用普通函数时所做的那样:
>>> @app.task
... def add(x, y):
... return x + y
# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 376, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 485, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)
您可以通过将其 typing
属性设置为 False
来禁用任何任务的参数检查:
>>> @app.task(typing=False)
... def add(x, y):
... return x + y
# Works locally, but the worker receiving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
在参数中隐藏敏感信息
4.0 版中的新功能。
使用 :setting:`task_protocol` 2 或更高版本(自 4.0 以来的默认值)时,您可以使用 argsrepr
和 kwargsrepr
调用参数:
>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
>>> charge.s(account, card='1234 5678 1234 5678').set(
... kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()
警告
任何能够从代理读取您的任务消息或以其他方式拦截它的人仍然可以访问敏感信息。
出于这个原因,如果您的消息包含敏感信息,您可能应该对其进行加密,或者在此示例中使用信用卡号,实际号码可以加密存储在安全存储中,您可以在任务本身中检索和解密。
重试
Task.retry()
可用于重新执行任务,例如在出现可恢复错误时。
当您调用 retry
时,它将使用相同的任务 ID 发送一条新消息,并确保将消息传递到与原始任务相同的队列。
重试任务时,这也会记录为任务状态,以便您可以使用结果实例跟踪任务的进度(请参阅 States)。
下面是一个使用 retry
的例子:
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
笔记
Task.retry()
调用将引发异常,因此不会到达重试后的任何代码。 这是 @Retry
异常,它不会作为错误处理,而是作为半谓词向工作人员表示要重试任务,以便在结果发生时它可以存储正确的状态后端已启用。
这是正常操作,除非将重试的 throw
参数设置为 False
,否则总是会发生。
任务装饰器的绑定参数将提供对 self
(任务类型实例)的访问。
exc
参数用于传递日志中和存储任务结果时使用的异常信息。 异常和回溯都将在任务状态中可用(如果启用了结果后端)。
如果任务具有 max_retries
值,则在超过最大重试次数时将重新引发当前异常,但在以下情况下不会发生这种情况:
没有给出
exc
参数。在这种情况下,将引发
@MaxRetriesExceededError
异常。目前没有例外
如果没有原始异常重新引发
exc
参数将被使用,所以:self.retry(exc=Twitter.LoginError())
将引发给定的
exc
参数。
使用自定义重试延迟
当一个任务要重试时,它可以等待给定的时间再执行,默认延迟由 default_retry_delay
属性定义。 默认情况下,这设置为 3 分钟。 请注意,设置延迟的单位是秒(int 或 float)。
您还可以向 retry()
提供 countdown 参数以覆盖此默认值。
@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
def add(self, x, y):
try:
something_raising()
except Exception as exc:
# overrides the default delay to retry after 1 minute
raise self.retry(exc=exc, countdown=60)
已知异常的自动重试
4.0 版中的新功能。
有时您只想在引发特定异常时重试任务。
幸运的是,您可以使用 @task()
装饰器中的 autoretry_for 参数告诉 Celery 自动重试任务:
from twitter.exceptions import FailWhaleError
@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
return twitter.refresh_timeline(user)
如果要为内部 retry()
调用指定自定义参数,请将 retry_kwargs 参数传递给 @task()
装饰器:
@app.task(autoretry_for=(FailWhaleError,),
retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
return twitter.refresh_timeline(user)
这是作为手动处理异常的替代方法提供的,上面的示例与将任务主体包装在 try
… except
语句中的作用相同:
@app.task
def refresh_timeline(user):
try:
twitter.refresh_timeline(user)
except FailWhaleError as exc:
raise div.retry(exc=exc, max_retries=5)
如果要自动重试任何错误,只需使用:
@app.task(autoretry_for=(Exception,))
def x():
...
4.2 版中的新功能。
如果您的任务依赖于其他服务,例如向 API 发出请求,那么最好使用 指数退避 以避免您的请求使服务不堪重负。 幸运的是,Celery 的自动重试支持使它变得容易。 只需指定 retry_backoff 参数,如下所示:
from requests.exceptions import RequestException
@app.task(autoretry_for=(RequestException,), retry_backoff=True)
def x():
...
默认情况下,这种指数退避还会引入随机 抖动 以避免所有任务同时运行。 它还将最大退避延迟限制为 10 分钟。 所有这些设置都可以通过下面记录的选项进行自定义。
4.4 版中的新功能。
您还可以设置 autoretry_for、max_retries、retry_backoff、retry_backoff_max 和 retry_jitter[X134X 任务] 选项:
class BaseTaskWithRetry(Task):
autoretry_for = (TypeError,)
max_retries = 5
retry_backoff = True
retry_backoff_max = 700
retry_jitter = False
- Task.autoretry_for
- 异常类的列表/元组。 如果在任务执行期间出现这些异常中的任何一个,任务将自动重试。 默认情况下,不会自动重试任何异常。
- Task.max_retries
- 一个号码。 放弃前的最大重试次数。
None
的值意味着任务将永远重试。 默认情况下,此选项设置为3
。
- Task.retry_backoff
- 布尔值或数字。 如果该选项设置为
True
,自动重试将按照指数退避的规则进行延迟。 第一次重试延迟 1 秒,第二次重试延迟 2 秒,第三次延迟 4 秒,第四次延迟 8 秒,以此类推。 (然而,这个延迟值被retry_jitter修改,如果它被启用。)如果这个选项被设置为一个数字,它被用作一个延迟因子。 例如,如果该选项设置为3
,则第一次重试延迟3秒,第二次延迟6秒,第三次延迟12秒,第四次延迟24秒,以此类推。 默认情况下,该选项设置为False
,并且自动重试不会延迟。
- Task.retry_backoff_max
- 一个号码。 如果启用
retry_backoff
,此选项将设置任务自动重试之间的最大延迟(以秒为单位)。 默认情况下,此选项设置为600
,即 10 分钟。
- Task.retry_jitter
- 一个布尔值。 Jitter 用于将随机性引入指数退避延迟,以防止队列中的所有任务同时执行。 如果此选项设置为
True
,则将 retry_backoff 计算的延迟值视为最大值,实际延迟值将是介于 0 和最大值之间的随机数。 默认情况下,此选项设置为True
。
选项列表
任务装饰器可以采用许多选项来改变任务的行为方式,例如,您可以使用 rate_limit
选项设置任务的速率限制。
传递给任务装饰器的任何关键字参数实际上都将被设置为结果任务类的属性,这是内置属性的列表。
一般
- Task.name
任务注册的名称。
您可以手动设置这个名称,或者使用模块和类名称自动生成一个名称。
另见 名称 。
- Task.request
如果任务正在执行,这将包含有关当前请求的信息。 使用线程本地存储。
参见 任务请求 。
- Task.max_retries
仅当任务调用
self.retry
或任务使用 autoretry_for 参数修饰时才适用。放弃前尝试重试的最大次数。 如果重试次数超过此值,则会引发
@MaxRetriesExceededError
异常。笔记
您必须手动调用
retry()
,因为它不会在出现异常时自动重试..默认值为
3
。None
的值将禁用重试限制,任务将永远重试直到成功。
- Task.throws
不应被视为实际错误的预期错误类的可选元组。
此列表中的错误将作为失败报告给结果后端,但工作人员不会将该事件记录为错误,并且不会包含任何回溯。
示例:
@task(throws=(KeyError, HttpNotFound)): def get_foo(): something()
错误类型:
预期错误(在
Task.throws
中)记录的严重性为
INFO
,不包括回溯。意外错误
记录的严重性为
ERROR
,包括回溯。
- Task.default_retry_delay
- 应执行重试任务之前的默认时间(以秒为单位)。 可以是
int
或float
。 默认为三分钟延迟。
- Task.rate_limit
设置此任务类型的速率限制(限制在给定时间范围内可以运行的任务数量)。 当速率限制生效时,任务仍将完成,但可能需要一些时间才能开始。
如果这是
None
,则没有速率限制生效。 如果是整数或浮点数,则解释为“每秒任务数”。通过将 “/s”、“/m” 或 “/h” 附加到值,可以以秒、分钟或小时为单位指定速率限制。 任务将在指定的时间范围内平均分配。
示例:“100/m”(每分钟一百个任务)。 这将强制在同一工作实例上启动两个任务之间的最小延迟为 600 毫秒。
默认为 :setting:`task_default_rate_limit` 设置:如果未指定,则默认情况下禁用任务的速率限制。
请注意,这是每个工作实例 的 速率限制,而不是全局速率限制。 要强制执行全局速率限制(例如,对于具有每秒最大请求数的 API),您必须限制到给定的队列。
- Task.time_limit
- 此任务的硬时间限制(以秒为单位)。 如果未设置,则使用工作人员默认值。
- Task.soft_time_limit
- 此任务的软时间限制。 如果未设置,则使用工作人员默认值。
- Task.ignore_result
- 不要存储任务状态。 请注意,这意味着您不能使用
AsyncResult
来检查任务是否准备就绪,或获取其返回值。
- Task.store_errors_even_if_ignored
- 如果为
True
,即使任务配置为忽略结果,也会存储错误。
- Task.serializer
标识要使用的默认序列化方法的字符串。 默认为 :setting:`task_serializer` 设置。 可以是 pickle、json、yaml 或任何已注册到
kombu.serialization.registry
的自定义序列化方法。请参阅 Serializers 了解更多信息。
- Task.compression
标识要使用的默认压缩方案的字符串。
默认为 :setting:`task_compression` 设置。 可以是 gzip 或 bzip2,或任何已在
kombu.compression
注册表中注册的自定义压缩方案。请参阅 压缩 了解更多信息。
- Task.backend
- 用于此任务的结果存储后端。 celery.backends 中后端类之一的实例。 默认为 app.backend,由 :setting:`result_backend` 设置定义。
- Task.acks_late
如果设置为
True
此任务的消息将在 任务执行后被确认 ,而不是在 之前的 (默认行为)。注意:这意味着如果工作程序在执行过程中崩溃,该任务可能会被执行多次。 确保您的任务是 幂等的 。
全局默认值可以被 :setting:`task_acks_late` 设置覆盖。
- Task.track_started
如果
True
任务将在工作人员执行任务时报告其状态为“已启动”。 默认值为False
,因为正常行为是不报告该粒度级别。 任务要么是挂起的,要么是已完成的,要么是等待重试。 当有长时间运行的任务并且需要报告当前正在运行的任务时,具有“开始”状态可能很有用。执行任务的工作者的主机名和进程 ID 将在状态元数据中可用(例如,result.info['pid'])
全局默认值可以被 :setting:`task_track_started` 设置覆盖。
也可以看看
@Task
的 API 参考。
州
Celery 可以跟踪任务的当前状态。 状态还包含成功任务的结果,或失败任务的异常和回溯信息。
有几个 result backends 可供选择,它们都有不同的优点和缺点(参见 Result Backends)。
在其生命周期中,任务将通过几种可能的状态进行转换,并且每个状态可能附加有任意的元数据。 当一个任务进入一个新状态时,之前的状态被遗忘了,但可以推断出一些转换,(例如,现在处于 :state:`FAILED` 状态的任务,暗示已经处于:state:`STARTED` 状态)。
还有一些状态集,比如 :state:`FAILURE_STATES` 集,以及 :state:`READY_STATES` 集。
客户端使用这些集合的成员资格来决定是否应该重新引发异常(:state:`PROPAGATE_STATES`),或者是否可以缓存状态(如果任务准备好就可以)。
您还可以定义 自定义状态 。
结果后端
如果您想跟踪任务或需要返回值,则 Celery 必须将状态存储或发送到某个地方,以便以后可以检索它们。 有几个内置的结果后端可供选择:SQLAlchemy/Django ORM、Memcached、RabbitMQ/QPid (rpc
) 和 Redis——或者您可以定义自己的。
没有后端适用于每个用例。 您应该阅读每个后端的优势和劣势,并选择最适合您需求的。
警告
后端使用资源来存储和传输结果。 为了确保资源被释放,您最终必须在调用任务后返回的每个 @AsyncResult
实例上调用 get()
或 forget()
。
RPC 结果后端 (RabbitMQ/QPid)
RPC 结果后端 (rpc://) 很特别,因为它实际上并不 存储 状态,而是将它们作为消息发送。 这是一个重要的区别,因为它意味着结果 只能检索一次 ,而 只能由发起任务的客户端 检索。 两个不同的进程不能等待相同的结果。
即使有这个限制,如果您需要实时接收状态更改,它也是一个很好的选择。 使用消息传递意味着客户端不必轮询新状态。
默认情况下,消息是暂时的(非持久性的),因此如果代理重新启动,结果将消失。 您可以使用 :setting:`result_persistent` 设置将结果后端配置为发送持久消息。
数据库结果后端
将状态保存在数据库中对许多人来说可能很方便,尤其是对于已经有数据库的 Web 应用程序,但它也有局限性。
轮询数据库以获取新状态的开销很大,因此您应该增加操作的轮询间隔,例如 result.get()。
某些数据库使用不适合轮询表更改的默认事务隔离级别。
在 MySQL 中,默认事务隔离级别是 REPEATABLE-READ:意味着在提交当前事务之前,事务不会看到其他事务所做的更改。
建议将其更改为 READ-COMMITTED 隔离级别。
内置状态
待定
任务正在等待执行或未知。 任何未知的任务 ID 都暗示处于挂起状态。
开始
任务已经开始。 默认不报告,启用请参见@Task.track_started
。
- 元数据
- 执行任务的工作进程的 pid 和 hostname。
成功
任务已成功执行。
- 元数据
- result 包含任务的返回值。
- 传播
- 是的
- 准备好了
- 是的
失败
任务执行失败。
- 元数据
- result 包含发生的异常,而 traceback 包含在引发异常时堆栈的回溯。
- 传播
- 是的
重试
正在重试任务。
- 元数据
- result 包含导致重试的异常,而 traceback 包含在引发异常时堆栈的回溯。
- 传播
- 否
撤销
任务已撤销。
- 传播
- 是的
自定义状态
您可以轻松定义自己的状态,您只需要一个唯一的名称。 状态的名称通常是大写字符串。 作为一个例子,你可以看看 abortable tasks
,它定义了一个自定义的 :state:`ABORTED` 状态。
使用 update_state()
更新任务的状态:。
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
在这里,我创建了状态 “PROGRESS”,通过让 current 和 告诉任何知道此状态的应用程序当前正在执行任务,以及它在进程中的位置]total 算作状态元数据的一部分。 例如,这可以用于创建进度条。
创建可腌制的异常
一个鲜为人知的 Python 事实是异常必须符合一些简单的规则以支持被 pickle 模块序列化。
当 Pickle 用作序列化程序时,引发不可pickle 异常的任务将无法正常工作。
为了确保您的异常是可腌制的,异常 必须 提供在其 .args
属性中实例化的原始参数。 确保这一点的最简单方法是使用异常调用 Exception.__init__
。
让我们看一些有效和无效的例子:
# OK:
class HttpError(Exception):
pass
# BAD:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
# OK:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
Exception.__init__(self, status_code) # <-- REQUIRED
所以规则是:对于支持自定义参数 *args
的任何异常,必须使用 Exception.__init__(self, *args)
。
没有对 关键字参数 的特殊支持,因此如果您想在取消异常时保留关键字参数,则必须将它们作为常规参数传递:
class HttpError(Exception):
def __init__(self, status_code, headers=None, body=None):
self.status_code = status_code
self.headers = headers
self.body = body
super(HttpError, self).__init__(status_code, headers, body)
半谓词
worker 将任务包装在一个跟踪函数中,该函数记录了任务的最终状态。 有许多异常可用于通知此函数以更改它处理任务返回的方式。
忽略
任务可能会引发 @Ignore
以强制工人忽略任务。 这意味着不会为任务记录任何状态,但仍会确认消息(从队列中删除)。
如果您想实现自定义的类似撤销的功能,或者手动存储任务的结果,可以使用它。
将撤销的任务保存在 Redis 集中的示例:
from celery.exceptions import Ignore
@app.task(bind=True)
def some_task(self):
if redis.ismember('tasks.revoked', self.request.id):
raise Ignore()
手动存储结果的示例:
from celery import states
from celery.exceptions import Ignore
@app.task(bind=True)
def get_tweets(self, user):
timeline = twitter.get_timeline(user)
if not self.request.called_directly:
self.update_state(state=states.SUCCESS, meta=timeline)
raise Ignore()
拒绝
任务可能会提出 @Reject
以使用 AMQPs basic_reject
方法拒绝任务消息。 除非启用 Task.acks_late,否则这不会产生任何影响。
拒绝消息与确认消息具有相同的效果,但一些代理可能会实现可以使用的附加功能。 例如,RabbitMQ 支持 死信交换 的概念,其中可以将队列配置为使用死信交换,将被拒绝的消息重新传递到。
Reject 也可用于重新排队消息,但请在使用时非常小心,因为它很容易导致无限的消息循环。
当任务导致内存不足情况时使用拒绝的示例:
import errno
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def render_scene(self, path):
file = get_file(path)
try:
renderer.render_scene(file)
# if the file is too big to fit in memory
# we reject it so that it's redelivered to the dead letter exchange
# and we can manually inspect the situation.
except MemoryError as exc:
raise Reject(exc, requeue=False)
except OSError as exc:
if exc.errno == errno.ENOMEM:
raise Reject(exc, requeue=False)
# For any other error we retry after 10 seconds.
except Exception as exc:
raise self.retry(exc, countdown=10)
重新排队消息的示例:
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def requeues(self):
if not self.request.delivery_info['redelivered']:
raise Reject('no reason', requeue=True)
print('received two times')
有关 basic_reject
方法的更多详细信息,请参阅您的经纪人文档。
重试
@Retry
异常由 Task.retry
方法引发,以告诉工作人员正在重试任务。
自定义任务类
所有任务都继承自 @Task
类。 run()
方法成为任务主体。
例如,以下代码,
@app.task
def add(x, y):
return x + y
将在幕后大致做到这一点:
class _AddTask(app.Task):
def run(self, x, y):
return x + y
add = app.tasks[_AddTask.name]
实例化
任务 不是 为每个请求实例化,而是在任务注册表中注册为全局实例。
这意味着 __init__
构造函数每个进程只会被调用一次,并且任务类在语义上更接近于 Actor。
如果你有任务,
from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
并且您将每个请求路由到同一个进程,然后它将在请求之间保持状态。
这对于缓存资源也很有用,例如,缓存数据库连接的基本 Task 类:
from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
每个任务的使用
可以将上述内容添加到每个任务中,如下所示:
@app.task(base=DatabaseTask)
def process_rows():
for row in process_rows.db.table.all():
process_row(row)
process_rows
任务的 db
属性将在每个进程中始终保持不变。
应用范围的使用
您还可以通过在实例化应用程序时将其作为 task_cls
参数传递来在整个 Celery 应用程序中使用您的自定义类。 这个参数应该是一个字符串,给出你的 Task 类的 python 路径或类本身:
from celery import Celery
app = Celery('tasks', task_cls='your.module.path:DatabaseTask')
这将使您在应用程序中使用装饰器语法声明的所有任务都使用您的 DatabaseTask
类,并且都将具有 db
属性。
默认值是 Celery 提供的类:'celery.app.task:Task'
。
处理程序
- before_start(self, task_id, args, kwargs)
在任务开始执行之前由工作人员运行。
5.2 版中的新功能。
- 参数
task_id – 要执行的任务的唯一 ID。
args – 要执行的任务的原始参数。
kwargs – 要执行的任务的原始关键字参数。
忽略此处理程序的返回值。
- after_return(self, status, retval, task_id, args, kwargs, einfo)
任务返回后调用的处理程序。
- 参数
status – 当前任务状态。
retval – 任务返回值/异常。
task_id – 任务的唯一 ID。
args – 返回的任务的原始参数。
kwargs – 返回任务的原始关键字参数。
einfo –
ExceptionInfo
实例,包含回溯(如果有)。
忽略此处理程序的返回值。
- on_failure(self, exc, task_id, args, kwargs, einfo)
当任务失败时,由工作人员运行。
- 参数
exc – 任务引发的异常。
task_id – 失败任务的唯一 ID。
args – 失败任务的原始参数。
kwargs – 失败任务的原始关键字参数。
einfo –
ExceptionInfo
实例,包含回溯。
忽略此处理程序的返回值。
- on_retry(self, exc, task_id, args, kwargs, einfo)
当要重试任务时,这是由工作人员运行的。
- 参数
exc – 发送到
retry()
的异常。task_id – 重试任务的唯一 ID。
args – 重试任务的原始参数。
kwargs – 重试任务的原始关键字参数。
einfo –
ExceptionInfo
实例,包含回溯。
忽略此处理程序的返回值。
- on_success(self, retval, task_id, args, kwargs)
如果任务成功执行,则由工作人员运行。
- 参数
retval – 任务的返回值。
task_id – 已执行任务的唯一 ID。
args – 已执行任务的原始参数。
kwargs – 已执行任务的原始关键字参数。
忽略此处理程序的返回值。
请求和自定义请求
在收到运行任务的消息后,worker 创建一个 request
来表示这样的需求。
自定义任务类可以通过更改属性 celery.app.task.Task.Request
来覆盖要使用的请求类。 您可以分配自定义请求类本身,或其完全限定名称。
该请求有几个职责。 自定义请求类应该涵盖所有这些——它们负责实际运行和跟踪任务。 我们强烈建议从 celery.worker.request.Request
继承。
当使用pre-forking worker时,方法on_timeout()
和on_failure()
在主worker进程中执行。 应用程序可以利用这种功能来检测使用 celery.app.task.Task.on_failure()
未检测到的故障。
例如,以下自定义请求检测并记录硬时间限制和其他故障。
import logging
from celery import Task
from celery.worker.request import Request
logger = logging.getLogger('my.package')
class MyRequest(Request):
'A minimal custom request to log failures and hard time limits.'
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'A hard timeout was enforced for task %s',
self.task.name
)
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
super().on_failure(
exc_info,
send_failed_event=send_failed_event,
return_ok=return_ok
)
logger.warning(
'Failure detected for task %s',
self.task.name
)
class MyTask(Task):
Request = MyRequest # you can use a FQN 'my.package:MyRequest'
@app.task(base=MyTask)
def some_longrunning_task():
# use your imagination
工作原理
下面是技术细节。 这部分不是您需要知道的,但您可能会感兴趣。
所有定义的任务都列在注册表中。 注册表包含任务名称及其任务类的列表。 您可以自己调查此注册表:
>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
<@task: celery.chord_unlock>,
'celery.backend_cleanup':
<@task: celery.backend_cleanup>,
'celery.chord':
<@task: celery.chord>}
这是 Celery 内置的任务列表。 请注意,只有在导入定义它们的模块时才会注册任务。
默认加载器导入 :setting:`imports` 设置中列出的任何模块。
@task()
装饰器负责在应用程序任务注册表中注册您的任务。
发送任务时,不会随其发送实际的功能代码,仅发送要执行的任务的名称。 当工作人员收到消息时,它可以在其任务注册表中查找名称以找到执行代码。
这意味着您的工作人员应始终使用与客户端相同的软件进行更新。 这是一个缺点,但替代方案是一项尚未解决的技术挑战。
提示和最佳实践
忽略你不想要的结果
如果您不关心任务的结果,请务必设置 ignore_result
选项,因为存储结果会浪费时间和资源。
@app.task(ignore_result=True)
def mytask():
something()
甚至可以使用 :setting:`task_ignore_result` 设置全局禁用结果。
在调用 apply_async
或 delay
时,通过传递 ignore_result
布尔参数,可以在每次执行的基础上启用/禁用结果。
@app.task
def mytask(x, y):
return x + y
# No result will be stored
result = mytask.apply_async(1, 2, ignore_result=True)
print result.get() # -> None
# Result will be stored
result = mytask.apply_async(1, 2, ignore_result=False)
print result.get() # -> 3
默认情况下,当配置了结果后端时,任务将 不忽略结果 (ignore_result=False
)。
选项优先顺序如下:
- 全局 :设置:`task_ignore_result`
ignore_result
选项- 任务执行选项
ignore_result
避免启动同步子任务
让一个任务等待另一个任务的结果确实是低效的,如果工作池耗尽甚至可能导致死锁。
改为使您的设计异步,例如使用 回调 。
坏:
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
好:
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
在这里,我通过将不同的 signature()
链接在一起来创建一系列任务。 您可以在 画布:设计工作流 中阅读有关链和其他强大结构的信息。
默认情况下,Celery 不允许您在任务中同步运行子任务,但在极少数或极端情况下,您可能需要这样做。 警告:不建议让子任务同步运行!
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get(disable_sync_subtasks=False)
info = parse_page.delay(url, page).get(disable_sync_subtasks=False)
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(url, page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
性能和策略
粒度
任务粒度是每个子任务所需的计算量。 一般来说,最好将问题拆分成许多小任务,而不是几个长时间运行的任务。
对于较小的任务,您可以并行处理更多任务,并且任务不会运行到足以阻止工作人员处理其他等待任务的时间。
但是,执行任务确实有开销。 需要发送消息,数据可能不在本地等。 因此,如果任务粒度太细,增加的开销可能会消除任何好处。
- AOC1
- 布雷希尔斯,克莱。 第 2.2.1 节,“并发的艺术”。 奥莱利媒体公司 2009 年 5 月 15 日。 ISBN-13 978-0-596-52153-0。
数据局部性
处理任务的工人应该尽可能接近数据。 最好的办法是在内存中有一个副本,最坏的办法是从另一个大陆完全转移。
如果数据很远,您可以尝试在该位置运行另一个工作程序,或者如果这不可能 - 缓存经常使用的数据,或预加载您知道将要使用的数据。
在 worker 之间共享数据的最简单方法是使用分布式缓存系统,例如 memcached。
状态
由于 Celery 是一个分布式系统,你无法知道任务将在哪个进程,或在哪台机器上执行。 您甚至无法知道任务是否会及时运行。
古老的异步谚语告诉我们“断言世界是任务的责任”。 这意味着自从请求任务以来世界观可能已经改变,因此任务负责确保世界是它应该的样子; 如果您有一个重新索引搜索引擎的任务,并且搜索引擎最多只能每 5 分钟重新索引一次,那么断言这一点必须是任务的责任,而不是调用者。
另一个问题是 Django 模型对象。 它们不应该作为参数传递给任务。 在任务运行时从数据库重新获取对象几乎总是更好,因为使用旧数据可能会导致竞争条件。
想象一下下面的场景,你有一篇文章和一个自动扩展其中一些缩写的任务:
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('MyCorp', 'My Corporation')
article.save()
首先,作者创建一篇文章并保存,然后作者单击启动缩写任务的按钮:
>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)
现在,队列很忙,所以任务不会再运行 2 分钟。 与此同时,另一位作者对文章进行了更改,因此当任务最终运行时,文章的正文将恢复为旧版本,因为该任务在其参数中使用了旧正文。
修复竞争条件很容易,只需使用文章 ID,然后在任务正文中重新获取文章:
@app.task
def expand_abbreviations(article_id):
article = Article.objects.get(id=article_id)
article.body.replace('MyCorp', 'My Corporation')
article.save()
>>> expand_abbreviations.delay(article_id)
这种方法甚至可能有性能优势,因为发送大消息可能很昂贵。
数据库事务
让我们再看一个例子:
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
return HttpResponseRedirect('/articles/')
这是一个 Django 视图,它在数据库中创建一个文章对象,然后将主键传递给任务。 它使用 transaction.atomic 装饰器,它会在视图返回时提交事务,或者在视图引发异常时回滚。
如果任务在事务提交之前开始执行,则存在竞争条件; 数据库对象还不存在!
解决方案是使用 on_commit
回调在成功提交所有事务后启动 Celery 任务。
from django.db.transaction import on_commit
def create_article(request):
article = Article.objects.create()
on_commit(lambda: expand_abbreviations.delay(article.pk))
示例
让我们举一个真实的例子:一个博客,其中发布的评论需要过滤垃圾邮件。 创建评论时,垃圾邮件过滤器在后台运行,因此用户不必等待它完成。
我有一个 Django 博客应用程序,允许对博客文章发表评论。 我将描述此应用程序的部分模型/视图和任务。
blog/models.py
评论模型如下所示:
from django.db import models
from django.utils.translation import ugettext_lazy as _
class Comment(models.Model):
name = models.CharField(_('name'), max_length=64)
email_address = models.EmailField(_('email address'))
homepage = models.URLField(_('home page'),
blank=True, verify_exists=False)
comment = models.TextField(_('comment'))
pub_date = models.DateTimeField(_('Published date'),
editable=False, auto_add_now=True)
is_spam = models.BooleanField(_('spam?'),
default=False, editable=False)
class Meta:
verbose_name = _('comment')
verbose_name_plural = _('comments')
在发表评论的视图中,我首先将评论写入数据库,然后在后台启动垃圾邮件过滤任务。
blog/views.py
from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response
from blog import tasks
from blog.models import Comment
class CommentForm(forms.ModelForm):
class Meta:
model = Comment
def add_comment(request, slug, template_name='comments/create.html'):
post = get_object_or_404(Entry, slug=slug)
remote_addr = request.META.get('REMOTE_ADDR')
if request.method == 'post':
form = CommentForm(request.POST, request.FILES)
if form.is_valid():
comment = form.save()
# Check spam asynchronously.
tasks.spam_filter.delay(comment_id=comment.id,
remote_addr=remote_addr)
return HttpResponseRedirect(post.get_absolute_url())
else:
form = CommentForm()
context = RequestContext(request, {'form': form})
return render_to_response(template_name, context_instance=context)
为了过滤评论中的垃圾邮件,我使用 Akismet,该服务用于过滤发布到免费博客平台 Wordpress 的评论中的垃圾邮件。 Akismet个人使用免费,商业使用需要付费。 您必须注册他们的服务才能获得 API 密钥。
为了对 Akismet 进行 API 调用,我使用了 Michael Foord 编写的 akismet.py 库。
blog/tasks.py
from celery import Celery
from akismet import Akismet
from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site
from blog.models import Comment
app = Celery(broker='amqp://')
@app.task
def spam_filter(comment_id, remote_addr=None):
logger = spam_filter.get_logger()
logger.info('Running spam filter for comment %s', comment_id)
comment = Comment.objects.get(pk=comment_id)
current_domain = Site.objects.get_current().domain
akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
if not akismet.verify_key():
raise ImproperlyConfigured('Invalid AKISMET_KEY')
is_spam = akismet.comment_check(user_ip=remote_addr,
comment_content=comment.comment,
comment_author=comment.name,
comment_author_email=comment.email_address)
if is_spam:
comment.is_spam = True
comment.save()
return is_spam