调用任务 — Python 文档
调用任务
基础知识
本文档描述了任务实例和 canvas 使用的 Celery 统一的“调用 API”。
API 定义了一组标准的执行选项,以及三种方法:
apply_async(args[, kwargs[, …]])
发送任务消息。
delay(*args, **kwargs)
发送任务消息的快捷方式,但不支持执行选项。
呼叫 (
__call__
)应用支持调用 API 的对象(例如,
add(2, 2)
)意味着任务不会由工作人员执行,而是在当前进程中执行(不会发送消息)。
快速备忘单
- *;
T.delay(arg, kwarg=value)
.apply_async
的星形参数快捷方式。 (.delay(*args, **kwargs)
调用.apply_async(args, kwargs)
)。
T.apply_async((arg,), {'kwarg': value})
- *;
T.apply_async(countdown=10)
- 从现在起 10 秒内执行。
- *;
T.apply_async(eta=now + timedelta(seconds=10))
- 从现在起 10 秒内执行,使用
eta
指定
- 从现在起 10 秒内执行,使用
- *;
T.apply_async(countdown=60, expires=120)
- 从现在起一分钟内执行,但在 2 分钟后过期。
- *;
T.apply_async(expires=now + timedelta(days=2))
- 2 天后到期,使用
datetime
设置。
- 2 天后到期,使用
示例
delay()
方法很方便,因为它看起来像调用一个常规函数:
使用 apply_async()
代替你必须写:
提示
如果任务未在当前进程中注册,您可以使用 @send_task()
来按名称调用任务。
所以delay显然很方便,但是如果你想设置额外的执行选项,你必须使用apply_async
。
本文档的其余部分将详细介绍任务执行选项。 所有示例都使用名为 add 的任务,返回两个参数的总和:
链接(回调/错误回复)
Celery 支持将任务链接在一起,以便一个任务跟随另一个任务。 回调任务将应用父任务的结果作为部分参数:
什么是s
?
这里使用的 add.s
调用称为签名。 如果您不知道它们是什么,您应该在 画布指南 中阅读它们。 您还可以在那里了解 chain
:一种将任务链接在一起的更简单方法。
在实践中, link
执行选项被认为是一个内部原语,你可能不会直接使用它,而是使用链。
这里第一个任务 (4) 的结果将被发送到一个新任务,该任务将前一个结果加 16,形成表达式 \((2 + 2) + 16 = 20\)
如果任务引发异常 (errback),您还可以导致应用回调。 工作人员实际上不会将 errback 作为任务调用,而是直接调用 errback 函数,以便可以将原始请求、异常和回溯对象传递给它。
这是一个错误回调示例:
可以使用 link_error
执行选项将其添加到任务中:
此外,link
和 link_error
选项都可以表示为一个列表:
然后将按顺序调用回调/错误返回,并且所有回调都将使用父任务的返回值作为部分参数进行调用。
在留言中
Celery 支持通过设置 on_message 回调来捕捉所有状态变化。
例如,对于发送任务进度的长时间运行的任务,您可以执行以下操作:
将生成如下输出:
ETA 和倒计时
ETA(预计到达时间)可让您设置特定日期和时间,即最早执行任务的时间。 countdown 是设置未来几秒内预计到达时间的快捷方式。
任务保证在指定日期和时间 之后的某个时间执行 ,但不一定在那个确切时间执行。 逾期的可能原因可能包括队列中等待的许多项目,或严重的网络延迟。 为了确保您的任务及时执行,您应该监控队列是否拥塞。 使用 Munin 或类似工具接收警报,以便可以采取适当的措施来减轻工作量。 见穆宁。
countdown 是一个整数,而 eta[X47X] 必须是一个 datetime
对象,指定一个确切的日期和时间(包括毫秒精度和时区信息):
警告
使用 RabbitMQ 作为消息代理时,指定 countdown
超过 15 分钟时,可能会遇到 worker 终止时会出现 PreconditionFailed
错误的问题:
在 RabbitMQ 3.8.15 版本之后,consumer_timeout
的默认值为 15 分钟。 从 3.8.17 版本开始,它增加到 30 分钟。 如果消费者未确认其交付超过超时值,则其通道将关闭,并显示 PRECONDITION_FAILED
通道异常。 有关详细信息,请参阅 传递确认超时 。
为了解决这个问题,在RabbitMQ配置文件rabbitmq.conf
中,你应该指定大于或等于你的倒计时值的consumer_timeout
参数。 例如,您可以指定一个非常大的 consumer_timeout = 31622400000
值,它等于 1 年(以毫秒为单位),以避免将来出现问题。
到期
expires 参数定义了一个可选的到期时间,可以是任务发布后的秒数,也可以是使用 datetime
的特定日期和时间:
当工作人员收到过期任务时,它会将任务标记为 :state:`REVOKED` (@TaskRevokedError
)。
消息发送重试
Celery 会在连接失败时自动重试发送消息,并且可以配置重试行为——比如重试的频率,或最大重试次数——或者一起禁用。
要禁用重试,您可以将 retry
执行选项设置为 False
:
重试策略
重试策略是一种控制重试行为的映射,可以包含以下键:
最大重试次数
放弃前的最大重试次数,在这种情况下,将引发导致重试失败的异常。
None
的值意味着它将永远重试。默认为重试 3 次。
间隔开始
定义在重试之间等待的秒数(浮点数或整数)。 默认值为 0(第一次重试将是瞬时的)。
间隔_步长
在每次连续重试时,此数字将添加到重试延迟(浮点数或整数)中。 默认值为 0.2。
间隔_最大值
重试之间等待的最大秒数(浮点数或整数)。 默认值为 0.2。
例如,默认策略与:
重试所花费的最长时间为 0.4 秒。 默认情况下它设置得相对较短,因为如果代理连接断开,连接失败可能会导致重试堆效应——例如,许多 Web 服务器进程等待重试,阻塞其他传入请求。
连接错误处理
当您发送任务并且消息传输连接丢失,或无法启动连接时,将引发 OperationalError
错误:
如果您启用了 retries,这只会在重试用完后或立即禁用时发生。
您也可以处理此错误:
序列化器
安全
pickle 模块允许执行任意函数,请参阅 安全指南 。
Celery 还带有一个特殊的序列化程序,它使用加密技术对您的消息进行签名。
在客户端和工作线程之间传输的数据需要序列化,因此 Celery 中的每条消息都有一个 content_type
标头,描述用于对其进行编码的序列化方法。
默认的序列化程序是 JSON,但您可以使用 :setting:`task_serializer` 设置更改此设置,或者针对每个单独的任务,甚至每个消息。
内置支持 JSON、pickle
、YAML 和 msgpack
,您还可以通过将它们注册到Kombu 序列化器注册表
也可以看看
Kombu 用户指南中的 消息序列化 。
每个选项都有其优点和缺点。
- json – 许多编程语言都支持 JSON,现在
Python 的标准部分(自 2.6 起),使用现代 Python 库(例如 :pypi:`simplejson`)解码相当快。
JSON 的主要缺点是它限制您使用以下数据类型:字符串、Unicode、浮点数、布尔值、字典和列表。 小数点和日期明显缺失。
Binary data will be transferred using Base64 encoding, increasing the size of the transferred data by 34% compared to an encoding format where native binary types are supported.
但是,如果您的数据符合上述限制,并且您需要跨语言支持,那么 JSON 的默认设置可能是您的最佳选择。
有关更多信息,请参阅 http://json.org。
笔记
(来自 Python 官方文档 https://docs.python.org/3.6/library/json.html)JSON 的键/值对中的键总是类型为
str
。 当字典转换为 JSON 时,字典的所有键都被强制转换为字符串。 因此,如果将字典转换为 JSON,然后再转换回字典,则该字典可能不等于原始字典。 也就是说,如果 x 具有非字符串键,则为loads(dumps(x)) != x
。- pickle – 如果您不想支持除
Python,然后使用 pickle 编码将获得对所有内置 Python 数据类型(类实例除外)的支持、发送二进制文件时的较小消息以及对 JSON 处理的略微加速。
有关详细信息,请参阅
pickle
。- yaml – YAML 具有许多与 json 相同的特性,
除了它本身支持更多的数据类型(包括日期、递归引用等)。
但是,用于 YAML 的 Python 库比用于 JSON 的库要慢一些。
如果您需要一组更具表现力的数据类型并需要保持跨语言兼容性,那么 YAML 可能比上述更适合。
有关更多信息,请参阅 http://yaml.org/。
- msgpack – msgpack 是一种更接近 JSON 的二进制序列化格式
在功能上。 然而,它非常年轻,此时应该将支持视为实验性的。
有关更多信息,请参阅 http://msgpack.org/。
使用的编码可用作消息头,因此工作人员知道如何反序列化任何任务。 如果您使用自定义序列化程序,则此序列化程序必须可供工作人员使用。
以下顺序用于决定发送任务时使用的序列化程序:
- serializer 执行选项。
@-Task.serializer
属性- :setting:`task_serializer` 设置。
为单个任务调用设置自定义序列化程序的示例:
压缩
Celery 可以使用以下内置方案压缩消息:
布罗特利
brotli 针对网络进行了优化,尤其是小文本文档。 它对于提供静态内容(例如字体和 html 页面)最有效。
要使用它,请使用以下命令安装 Celery:
bzip2
bzip2 创建的文件比 gzip 小,但压缩和解压速度明显比 gzip 慢。
要使用它,请确保您的 Python 可执行文件是使用 bzip2 支持编译的。
如果您得到以下
ImportError
:这意味着您应该使用 bzip2 支持重新编译您的 Python 版本。
压缩包
gzip 适用于需要小内存占用的系统,非常适合内存有限的系统。 它通常用于生成扩展名为“.tar.gz”的文件。
要使用它,请确保您的 Python 可执行文件是使用 gzip 支持编译的。
如果您得到以下
ImportError
:这意味着您应该使用 gzip 支持重新编译 Python 版本。
lzma
lzma 提供了良好的压缩率,并以更快的压缩和解压缩速度执行,但代价是更高的内存使用率。
要使用它,请确保您的 Python 可执行文件是使用 lzma 支持编译的,并且您的 Python 版本是 3.3 及更高版本。
如果您得到以下
ImportError
:这意味着您应该使用 lzma 支持重新编译您的 Python 版本。
或者,您也可以使用以下方法安装 backport:
zlib
zlib 是库形式的 Deflate 算法的抽象,它在其 API 中包括对 gzip 文件格式和轻量级流格式的支持。 它是许多软件系统的关键组件 - Linux 内核和 Git VCS 仅举几例。
要使用它,请确保您的 Python 可执行文件是使用 zlib 支持编译的。
如果您得到以下
ImportError
:这意味着您应该使用 zlib 支持重新编译 Python 版本。
zstd
zstd 针对 zlib 级别的实时压缩场景和更好的压缩率。 它由 Huff0 和 FSE 库提供的非常快的熵阶段支持。
要使用它,请使用以下命令安装 Celery:
您还可以创建自己的压缩方案并将其注册到 kombu compression registry
。
以下顺序用于决定发送任务时使用的压缩方案:
- 压缩执行选项。
@-Task.compression
属性。- :setting:`task_compression` 属性。
指定调用任务时使用的压缩的示例:
连接
自动池支持
从版本 2.3 开始支持自动连接池,因此您不必手动处理连接和发布者来重用连接。
从 2.5 版开始,默认情况下启用连接池。
有关更多信息,请参阅 :setting:`broker_pool_limit` 设置。
您可以通过创建发布者手动处理连接:
虽然这个特殊的例子更适合用一个组来表达:
路由选项
Celery 可以将任务路由到不同的队列。
简单的路由(名称 名称)是使用queue
选项:
然后,您可以使用工作人员 -Q
参数将工作人员分配到 priority.high
队列:
结果选项
您可以使用 :setting:`task_ignore_result` 设置或使用 ignore_result
选项启用或禁用结果存储:
如果您想在结果后端存储有关任务的其他元数据,请将 :setting:`result_extended` 设置为 True
。
高级选项
这些选项适用于想要使用 AMQP 完整路由功能的高级用户。 有兴趣的可以阅读路由指南。
交换
要将消息发送到的交换的名称(或
kombu.entity.Exchange
)。路由密钥
路由键用来确定。
优先权
0 和 255 之间的数字,其中 255 是最高优先级。
支持:RabbitMQ、Redis(优先级颠倒,0为最高)。