消息协议 — Python 文档

来自菜鸟教程
Celery/docs/latest/internals/protocol
跳转至:导航、​搜索

消息协议

任务消息

版本 2

定义

properties = {
    'correlation_id': uuid task_id,
    'content_type': string mimetype,
    'content_encoding': string encoding,

    # optional
    'reply_to': string queue_or_url,
}
headers = {
    'lang': string 'py'
    'task': string task,
    'id': uuid task_id,
    'root_id': uuid root_id,
    'parent_id': uuid parent_id,
    'group': uuid group_id,

    # optional
    'meth': string method_name,
    'shadow': string alias_name,
    'eta': iso8601 ETA,
    'expires': iso8601 expires,
    'retries': int retries,
    'timelimit': (soft, hard),
    'argsrepr': str repr(args),
    'kwargsrepr': str repr(kwargs),
    'origin': str nodename,
    'replaced_task_nesting': int
}

body = (
    object[] args,
    Mapping kwargs,
    Mapping embed {
        'callbacks': Signature[] callbacks,
        'errbacks': Signature[] errbacks,
        'chain': Signature[] chain,
        'chord': Signature chord_callback,
    }
)

示例

此示例使用协议的版本 2 发送任务消息:

# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8

import json
import os
import socket

task_id = uuid()
args = (2, 2)
kwargs = {}
basic_publish(
    message=json.dumps((args, kwargs, None)),
    application_headers={
        'lang': 'py',
        'task': 'proj.tasks.add',
        'argsrepr': repr(args),
        'kwargsrepr': repr(kwargs),
        'origin': '@'.join([os.getpid(), socket.gethostname()])
    }
    properties={
        'correlation_id': task_id,
        'content_type': 'application/json',
        'content_encoding': 'utf-8',
    }
)

版本 1 的变化

  • 通过 task 消息标头的存在检测到的协议版本。

  • 通过 lang 标头支持多种语言。

    工作人员可以将消息重定向到支持该语言的工作人员。

  • 元数据移动到标题。

    这意味着工作人员/中间人可以检查消息并根据标头做出决策,而无需解码有效负载(可能是特定于语言的,例如由 Python 特定的 pickle 序列化程序序列化)。

  • 始终为 UTC

    不再有 utc 标志,因此任何缺少时区的时间信息都将采用 UTC 时间。

  • 正文仅用于语言特定数据。

    • Python 将 args/kwargs 和嵌入的签名存储在 body 中。

    • 如果消息使用原始编码,则原始数据将作为单个参数传递给函数。

    • Java/C 等 可以使用 Thrift/protobuf 文档作为主体


  • origin 是发送任务的节点名称。

  • 基于 taskmeth 标头分派给演员

    meth Python 未使用,但将来可能会用于指定类+方法对。

  • Chain 获得了一个专门的领域。

    当超过递归限制时,将链减少为递归 callbacks 参数会导致问题。

    这在新的消息协议中通过指定签名列表来修复,然后每个任务将在发送下一条消息时从列表中弹出一个任务:

    execute_task(message)
    chain = embed['chain']
    if chain:
        sig = maybe_signature(chain.pop())
        sig.apply_async(chain=chain)
  • correlation_id 替换 task_id 字段。

  • root_idparent_id 字段有助于跟踪工作流程。

  • shadow 允许您为日志指定不同的名称,监视器可用于概念,例如调用指定为参数的函数的任务:

    from celery.utils.imports import qualname
    
    class PickleTask(Task):
    
        def unpack_args(self, fun, args=()):
            return fun, args
    
        def apply_async(self, args, kwargs, **options):
            fun, real_args = self.unpack_args(*args)
            return super().apply_async(
                (fun, real_args, kwargs), shadow=qualname(fun), **options
            )
    
    @app.task(base=PickleTask)
    def call(fun, args, kwargs):
        return fun(*args, **kwargs)


版本 1

在协议的第 1 版中,所有字段都存储在消息正文中:这意味着工作人员和中间消费者必须反序列化有效负载才能读取这些字段。

消息体

  • task
    细绳

    任务名称。 需要

  • id
    细绳

    任务的唯一 ID (UUID)。 需要

  • args
    列表

    参数列表。 如果未提供,将是一个空列表。

  • kwargs
    字典

    关键字参数字典。 如果未提供,将是一个空字典。

  • retries
    整数

    当前重试此任务的次数。 如果未指定,则默认为 0。

  • eta
    字符串 (ISO 8601)

    预计到达时间。 这是 ISO 8601 格式的日期和时间。 如果未提供,则不会安排消息,但会尽快执行。

  • expires
    字符串 (ISO 8601)

    2.0.2 版中的新功能。

    截止日期。 这是 ISO 8601 格式的日期和时间。 如果未提供,则消息将永不过期。 当收到消息并且已超过到期日期时,该消息将过期。

  • taskset
    细绳

    此任务所属的组(如果有)。

  • chord
    签名

    2.3 版中的新功能。

    表示此任务是和弦的标题部分之一。 这个键的值是当头中的所有任务都返回时应该执行的代码的主体。

  • utc
    布尔值

    2.5 版中的新功能。

    如果真实时间使用 UTC 时区,如果不是,则应使用当前本地时区。

  • callbacks
    签名

    3.0 版中的新功能。

    如果任务成功退出,要调用的签名列表。

  • errbacks
    签名

    3.0 版中的新功能。

    如果在执行任务时发生错误,要调用的签名列表。

  • timelimit
    (浮动,浮动)

    3.1 版中的新功能。

    任务执行时限设置。 这是硬和软时间限制值的元组(int/float 或 None 为无限制)。

    指定 3 秒软时间限制和 10 秒硬时间限制的示例值:

    {'timelimit': (3.0, 10.0)}


示例消息

这是 json 格式的 celery.task.ping 任务的示例调用:

{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
 "task": "celery.task.PingTask",
 "args": [],
 "kwargs": {},
 "retries": 0,
 "eta": "2009-11-17T12:30:56.527191"}

任务序列化

使用 content_type 消息头支持多种类型的序列化格式。

默认支持的 MIME 类型如下表所示。

方案 MIME 类型
json 应用程序/json
雅姆 应用程序/x-yaml
泡菜 应用程序/x-python-序列化
消息包 应用程序/x-msgpack


事件消息

事件消息始终是 JSON 序列化的,并且可以包含任意消息正文字段。

从 4.0 版开始。 主体可以由单个映射(一个事件)或映射列表(多个事件)组成。

还有一些标准字段必须始终出现在事件消息中:

标准正文字段

  • 细绳 type

    事件的类型。 这是一个包含 categoryaction 的字符串,由短划线分隔符(例如,task-succeeded)分隔。

  • 细绳 hostname

    事件发生位置的完全限定主机名。

  • 无符号长长 clock

    此事件的逻辑时钟值(Lamport 时间戳)。

  • 漂浮 timestamp

    对应于事件发生时间的 UNIX 时间戳。

  • 签名短 utcoffset

    该字段描述发起主机的时区,并指定为 UTC 之前/之后的小时数(例如,-2 或 +1)。

  • 无符号长长 pid

    事件起源的进程的进程 ID。


标准事件类型

有关标准事件类型及其字段的列表,请参阅 事件参考


示例消息

这是 task-succeeded 事件的消息字段:

properties = {
    'routing_key': 'task.succeeded',
    'exchange': 'celeryev',
    'content_type': 'application/json',
    'content_encoding': 'utf-8',
    'delivery_mode': 1,
}
headers = {
    'hostname': 'worker1@george.vandelay.com',
}
body = {
    'type': 'task-succeeded',
    'hostname': 'worker1@george.vandelay.com',
    'pid': 6335,
    'clock': 393912923921,
    'timestamp': 1401717709.101747,
    'utcoffset': -1,
    'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
    'retval': '4',
    'runtime': 0.0003212,
)