调用任务 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/calling
跳转至:导航、​搜索

调用任务

基础知识

本文档描述了任务实例和 canvas 使用的 Celery 统一的“调用 API”。

API 定义了一组标准的执行选项,以及三种方法:

  • apply_async(args[, kwargs[, …]])

    发送任务消息。

  • delay(*args, **kwargs)

    发送任务消息的快捷方式,但不支持执行选项。

  • 呼叫 (__call__)

    应用支持调用 API 的对象(例如,add(2, 2))意味着任务不会由工作人员执行,而是在当前进程中执行(不会发送消息)。


快速备忘单

  • *; T.delay(arg, kwarg=value)
    .apply_async 的星形参数快捷方式。 (.delay(*args, **kwargs) 调用 .apply_async(args, kwargs))。
  • T.apply_async((arg,), {'kwarg': value})
  • *; T.apply_async(countdown=10)
    从现在起 10 秒内执行。
  • *; T.apply_async(eta=now + timedelta(seconds=10))
    从现在起 10 秒内执行,使用 eta 指定
  • *; T.apply_async(countdown=60, expires=120)
    从现在起一分钟内执行,但在 2 分钟后过期。
  • *; T.apply_async(expires=now + timedelta(days=2))
    2 天后到期,使用 datetime 设置。


示例

delay() 方法很方便,因为它看起来像调用一个常规函数:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

使用 apply_async() 代替你必须写:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

提示

如果任务未在当前进程中注册,您可以使用 @send_task() 来按名称调用任务。

所以delay显然很方便,但是如果你想设置额外的执行选项,你必须使用apply_async

本文档的其余部分将详细介绍任务执行选项。 所有示例都使用名为 add 的任务,返回两个参数的总和:

@app.task
def add(x, y):
    return x + y

还有一种方法……

稍后您将在阅读 Canvas 时了解更多相关信息,但 signature 是用于传递任务调用签名的对象(例如通过网络),并且它们还支持 Calling API:

task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()

链接(回调/错误回复)

Celery 支持将任务链接在一起,以便一个任务跟随另一个任务。 回调任务将应用父任务的结果作为部分参数:

add.apply_async((2, 2), link=add.s(16))

什么是s

这里使用的 add.s 调用称为签名。 如果您不知道它们是什么,您应该在 画布指南 中阅读它们。 您还可以在那里了解 chain:一种将任务链接在一起的更简单方法。

在实践中, link 执行选项被认为是一个内部原语,你可能不会直接使用它,而是使用链。

这里第一个任务 (4) 的结果将被发送到一个新任务,该任务将前一个结果加 16,形成表达式 \((2 + 2) + 16 = 20\)

如果任务引发异常 (errback),您还可以导致应用回调。 工作人员实际上不会将 errback 作为任务调用,而是直接调用 errback 函数,以便可以将原始请求、异常和回溯对象传递给它。

这是一个错误回调示例:

@app.task
def error_handler(request, exc, traceback):
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          request.id, exc, traceback))

可以使用 link_error 执行选项将其添加到任务中:

add.apply_async((2, 2), link_error=error_handler.s())

此外,linklink_error 选项都可以表示为一个列表:

add.apply_async((2, 2), link=[add.s(16), other_task.s()])

然后将按顺序调用回调/错误返回,并且所有回调都将使用父任务的返回值作为部分参数进行调用。


在留言中

Celery 支持通过设置 on_message 回调来捕捉所有状态变化。

例如,对于发送任务进度的长时间运行的任务,您可以执行以下操作:

@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 90})
    time.sleep(1)
    return 'hello world: %i' % (a+b)
def on_raw_message(body):
    print(body)

a, b = 1, 1
r = hello.apply_async(args=(a, b))
print(r.get(on_message=on_raw_message, propagate=False))

将生成如下输出:

{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 50},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 90},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': 'hello world: 10',
 'children': [],
 'status': 'SUCCESS',
 'traceback': None}
hello world: 10

ETA 和倒计时

ETA(预计到达时间)可让您设置特定日期和时间,即最早执行任务的时间。 countdown 是设置未来几秒内预计到达时间的快捷方式。

>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()    # this takes at least 3 seconds to return
20

任务保证在指定日期和时间 之后的某个时间执行 ,但不一定在那个确切时间执行。 逾期的可能原因可能包括队列中等待的许多项目,或严重的网络延迟。 为了确保您的任务及时执行,您应该监控队列是否拥塞。 使用 Munin 或类似工具接收警报,以便可以采取适当的措施来减轻工作量。 见穆宁

countdown 是一个整数,而 eta[X47X] 必须是一个 datetime 对象,指定一个确切的日期和时间(包括毫秒精度和时区信息):

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)

警告

使用 RabbitMQ 作为消息代理时,指定 countdown 超过 15 分钟时,可能会遇到 worker 终止时会出现 PreconditionFailed 错误的问题:

amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - consumer ack timed out on channel

在 RabbitMQ 3.8.15 版本之后,consumer_timeout 的默认值为 15 分钟。 从 3.8.17 版本开始,它增加到 30 分钟。 如果消费者未确认其交付超过超时值,则其通道将关闭,并显示 PRECONDITION_FAILED 通道异常。 有关详细信息,请参阅 传递确认超时

为了解决这个问题,在RabbitMQ配置文件rabbitmq.conf中,你应该指定大于或等于你的倒计时值的consumer_timeout参数。 例如,您可以指定一个非常大的 consumer_timeout = 31622400000 值,它等于 1 年(以毫秒为单位),以避免将来出现问题。


到期

expires 参数定义了一个可选的到期时间,可以是任务发布后的秒数,也可以是使用 datetime 的特定日期和时间:

>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)

当工作人员收到过期任务时,它会将任务标记为 :state:`REVOKED` (@TaskRevokedError)。


消息发送重试

Celery 会在连接失败时自动重试发送消息,并且可以配置重试行为——比如重试的频率,或最大重试次数——或者一起禁用。

要禁用重试,您可以将 retry 执行选项设置为 False

add.apply_async((2, 2), retry=False)

重试策略

重试策略是一种控制重试行为的映射,可以包含以下键:

  • 最大重试次数

    放弃前的最大重试次数,在这种情况下,将引发导致重试失败的异常。

    None 的值意味着它将永远重试。

    默认为重试 3 次。

  • 间隔开始

    定义在重试之间等待的秒数(浮点数或整数)。 默认值为 0(第一次重试将是瞬时的)。

  • 间隔_步长

    在每次连续重试时,此数字将添加到重试延迟(浮点数或整数)中。 默认值为 0.2。

  • 间隔_最大值

    重试之间等待的最大秒数(浮点数或整数)。 默认值为 0.2。

例如,默认策略与:

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

重试所花费的最长时间为 0.4 秒。 默认情况下它设置得相对较短,因为如果代理连接断开,连接失败可能会导致重试堆效应——例如,许多 Web 服务器进程等待重试,阻塞其他传入请求。


连接错误处理

当您发送任务并且消息传输连接丢失,或无法启动连接时,将引发 OperationalError 错误:

>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 388, in delay
        return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 503, in apply_async
    **options
  File "celery/app/base.py", line 662, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "celery/backends/rpc.py", line 275, in on_task_call
    maybe_declare(self.binding(producer.channel), retry=True)
  File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
    channel = self._channel = channel()
  File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
    self.transport.connect()
  File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
    self.sock.connect(sa)
  kombu.exceptions.OperationalError: [Errno 61] Connection refused

如果您启用了 retries,这只会在重试用完后或立即禁用时发生。

您也可以处理此错误:

>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     logger.exception('Sending task raised: %r', exc)

序列化器

安全

pickle 模块允许执行任意函数,请参阅 安全指南

Celery 还带有一个特殊的序列化程序,它使用加密技术对您的消息进行签名。

在客户端和工作线程之间传输的数据需要序列化,因此 Celery 中的每条消息都有一个 content_type 标头,描述用于对其进行编码的序列化方法。

默认的序列化程序是 JSON,但您可以使用 :setting:`task_serializer` 设置更改此设置,或者针对每个单独的任务,甚至每个消息。

内置支持 JSON、pickle、YAML 和 msgpack,您还可以通过将它们注册到Kombu 序列化器注册表

也可以看看

Kombu 用户指南中的 消息序列化


每个选项都有其优点和缺点。

json – 许多编程语言都支持 JSON,现在

Python 的标准部分(自 2.6 起),使用现代 Python 库(例如 :pypi:`simplejson`)解码相当快。

JSON 的主要缺点是它限制您使用以下数据类型:字符串、Unicode、浮点数、布尔值、字典和列表。 小数点和日期明显缺失。

Binary data will be transferred using Base64 encoding, increasing the size of the transferred data by 34% compared to an encoding format where native binary types are supported.

但是,如果您的数据符合上述限制,并且您需要跨语言支持,那么 JSON 的默认设置可能是您的最佳选择。

有关更多信息,请参阅 http://json.org。

笔记

(来自 Python 官方文档 https://docs.python.org/3.6/library/json.html)JSON 的键/值对中的键总是类型为 str。 当字典转换为 JSON 时,字典的所有键都被强制转换为字符串。 因此,如果将字典转换为 JSON,然后再转换回字典,则该字典可能不等于原始字典。 也就是说,如果 x 具有非字符串键,则为 loads(dumps(x)) != x

pickle – 如果您不想支持除

Python,然后使用 pickle 编码将获得对所有内置 Python 数据类型(类实例除外)的支持、发送二进制文件时的较小消息以及对 JSON 处理的略微加速。

有关详细信息,请参阅 pickle

yaml – YAML 具有许多与 json 相同的特性,

除了它本身支持更多的数据类型(包括日期、递归引用等)。

但是,用于 YAML 的 Python 库比用于 JSON 的库要慢一些。

如果您需要一组更具表现力的数据类型并需要保持跨语言兼容性,那么 YAML 可能比上述更适合。

有关更多信息,请参阅 http://yaml.org/。

msgpack – msgpack 是一种更接近 JSON 的二进制序列化格式

在功能上。 然而,它非常年轻,此时应该将支持视为实验性的。

有关更多信息,请参阅 http://msgpack.org/。

使用的编码可用作消息头,因此工作人员知道如何反序列化任何任务。 如果您使用自定义序列化程序,则此序列化程序必须可供工作人员使用。

以下顺序用于决定发送任务时使用的序列化程序:

  1. serializer 执行选项。
  2. @-Task.serializer 属性
  3. :setting:`task_serializer` 设置。


为单个任务调用设置自定义序列化程序的示例:

>>> add.apply_async((10, 10), serializer='json')

压缩

Celery 可以使用以下内置方案压缩消息:

  • 布罗特利

    brotli 针对网络进行了优化,尤其是小文本文档。 它对于提供静态内容(例如字体和 html 页面)最有效。

    要使用它,请使用以下命令安装 Celery:

    $ pip install celery[brotli]
  • bzip2

    bzip2 创建的文件比 gzip 小,但压缩和解压速度明显比 gzip 慢。

    要使用它,请确保您的 Python 可执行文件是使用 bzip2 支持编译的。

    如果您得到以下 ImportError

    >>> import bz2
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'bz2'

    这意味着您应该使用 bzip2 支持重新编译您的 Python 版本。

  • 压缩包

    gzip 适用于需要小内存占用的系统,非常适合内存有限的系统。 它通常用于生成扩展名为“.tar.gz”的文件。

    要使用它,请确保您的 Python 可执行文件是使用 gzip 支持编译的。

    如果您得到以下 ImportError

    >>> import gzip
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'gzip'

    这意味着您应该使用 gzip 支持重新编译 Python 版本。

  • lzma

    lzma 提供了良好的压缩率,并以更快的压缩和解压缩速度执行,但代价是更高的内存使用率。

    要使用它,请确保您的 Python 可执行文件是使用 lzma 支持编译的,并且您的 Python 版本是 3.3 及更高版本。

    如果您得到以下 ImportError

    >>> import lzma
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'lzma'

    这意味着您应该使用 lzma 支持重新编译您的 Python 版本。

    或者,您也可以使用以下方法安装 backport:

    $ pip install celery[lzma]
  • zlib

    zlib 是库形式的 Deflate 算法的抽象,它在其 API 中包括对 gzip 文件格式和轻量级流格式的支持。 它是许多软件系统的关键组件 - Linux 内核和 Git VCS 仅举几例。

    要使用它,请确保您的 Python 可执行文件是使用 zlib 支持编译的。

    如果您得到以下 ImportError

    >>> import zlib
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'zlib'

    这意味着您应该使用 zlib 支持重新编译 Python 版本。

  • zstd

    zstd 针对 zlib 级别的实时压缩场景和更好的压缩率。 它由 Huff0 和 FSE 库提供的非常快的熵阶段支持。

    要使用它,请使用以下命令安装 Celery:

    $ pip install celery[zstd]

您还可以创建自己的压缩方案并将其注册到 kombu compression registry

以下顺序用于决定发送任务时使用的压缩方案:

  1. 压缩执行选项。
  2. @-Task.compression 属性。
  3. :setting:`task_compression` 属性。


指定调用任务时使用的压缩的示例:

>>> add.apply_async((2, 2), compression='zlib')

连接

自动池支持

从版本 2.3 开始支持自动连接池,因此您不必手动处理连接和发布者来重用连接。

从 2.5 版开始,默认情况下启用连接池。

有关更多信息,请参阅 :setting:`broker_pool_limit` 设置。

您可以通过创建发布者手动处理连接:

results = []
with add.app.pool.acquire(block=True) as connection:
    with add.get_publisher(connection) as publisher:
        try:
            for args in numbers:
                res = add.apply_async((2, 2), publisher=publisher)
                results.append(res)
print([res.get() for res in results])

虽然这个特殊的例子更适合用一个组来表达:

>>> from celery import group

>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()

>>> res.get()
[4, 8, 16, 32]

路由选项

Celery 可以将任务路由到不同的队列。

简单的路由(名称 名称)是使用queue选项:

add.apply_async(queue='priority.high')

然后,您可以使用工作人员 -Q 参数将工作人员分配到 priority.high 队列:

$ celery -A proj worker -l INFO -Q celery,priority.high

也可以看看

不推荐在代码中硬编码队列名称,最佳实践是使用配置路由器(:setting:`task_routes`)。

要了解有关路由的更多信息,请参阅 路由任务


结果选项

您可以使用 :setting:`task_ignore_result` 设置或使用 ignore_result 选项启用或禁用结果存储:

>>> result = add.apply_async((1, 2), ignore_result=True)
>>> result.get()
None

>>> # Do not ignore result (default)
...
>>> result = add.apply_async((1, 2), ignore_result=False)
>>> result.get()
3

如果您想在结果后端存储有关任务的其他元数据,请将 :setting:`result_extended` 设置为 True

也可以看看

有关任务的更多信息,请参阅 任务


高级选项

这些选项适用于想要使用 AMQP 完整路由功能的高级用户。 有兴趣的可以阅读路由指南

  • 交换

    要将消息发送到的交换的名称(或 kombu.entity.Exchange)。

  • 路由密钥

    路由键用来确定。

  • 优先权

    0 和 255 之间的数字,其中 255 是最高优先级。

    支持:RabbitMQ、Redis(优先级颠倒,0为最高)。