消息协议 — Python 文档
消息协议
任务消息
版本 2
定义
properties = {
'correlation_id': uuid task_id,
'content_type': string mimetype,
'content_encoding': string encoding,
# optional
'reply_to': string queue_or_url,
}
headers = {
'lang': string 'py'
'task': string task,
'id': uuid task_id,
'root_id': uuid root_id,
'parent_id': uuid parent_id,
'group': uuid group_id,
# optional
'meth': string method_name,
'shadow': string alias_name,
'eta': iso8601 ETA,
'expires': iso8601 expires,
'retries': int retries,
'timelimit': (soft, hard),
'argsrepr': str repr(args),
'kwargsrepr': str repr(kwargs),
'origin': str nodename,
'replaced_task_nesting': int
}
body = (
object[] args,
Mapping kwargs,
Mapping embed {
'callbacks': Signature[] callbacks,
'errbacks': Signature[] errbacks,
'chain': Signature[] chain,
'chord': Signature chord_callback,
}
)
示例
此示例使用协议的版本 2 发送任务消息:
# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
import json
import os
import socket
task_id = uuid()
args = (2, 2)
kwargs = {}
basic_publish(
message=json.dumps((args, kwargs, None)),
application_headers={
'lang': 'py',
'task': 'proj.tasks.add',
'argsrepr': repr(args),
'kwargsrepr': repr(kwargs),
'origin': '@'.join([os.getpid(), socket.gethostname()])
}
properties={
'correlation_id': task_id,
'content_type': 'application/json',
'content_encoding': 'utf-8',
}
)
版本 1 的变化
通过
task
消息标头的存在检测到的协议版本。通过
lang
标头支持多种语言。工作人员可以将消息重定向到支持该语言的工作人员。
元数据移动到标题。
这意味着工作人员/中间人可以检查消息并根据标头做出决策,而无需解码有效负载(可能是特定于语言的,例如由 Python 特定的 pickle 序列化程序序列化)。
始终为 UTC
不再有
utc
标志,因此任何缺少时区的时间信息都将采用 UTC 时间。正文仅用于语言特定数据。
Python 将 args/kwargs 和嵌入的签名存储在 body 中。
如果消息使用原始编码,则原始数据将作为单个参数传递给函数。
Java/C 等 可以使用 Thrift/protobuf 文档作为主体
origin
是发送任务的节点名称。基于
task
、meth
标头分派给演员meth
Python 未使用,但将来可能会用于指定类+方法对。Chain 获得了一个专门的领域。
当超过递归限制时,将链减少为递归
callbacks
参数会导致问题。这在新的消息协议中通过指定签名列表来修复,然后每个任务将在发送下一条消息时从列表中弹出一个任务:
execute_task(message) chain = embed['chain'] if chain: sig = maybe_signature(chain.pop()) sig.apply_async(chain=chain)
correlation_id
替换task_id
字段。root_id
和parent_id
字段有助于跟踪工作流程。shadow
允许您为日志指定不同的名称,监视器可用于概念,例如调用指定为参数的函数的任务:from celery.utils.imports import qualname class PickleTask(Task): def unpack_args(self, fun, args=()): return fun, args def apply_async(self, args, kwargs, **options): fun, real_args = self.unpack_args(*args) return super().apply_async( (fun, real_args, kwargs), shadow=qualname(fun), **options ) @app.task(base=PickleTask) def call(fun, args, kwargs): return fun(*args, **kwargs)
版本 1
在协议的第 1 版中,所有字段都存储在消息正文中:这意味着工作人员和中间消费者必须反序列化有效负载才能读取这些字段。
消息体
task
- 细绳
任务名称。 需要
id
- 细绳
任务的唯一 ID (UUID)。 需要
args
- 列表
参数列表。 如果未提供,将是一个空列表。
kwargs
- 字典
关键字参数字典。 如果未提供,将是一个空字典。
retries
- 整数
当前重试此任务的次数。 如果未指定,则默认为 0。
eta
- 字符串 (ISO 8601)
预计到达时间。 这是 ISO 8601 格式的日期和时间。 如果未提供,则不会安排消息,但会尽快执行。
expires
- 字符串 (ISO 8601)
2.0.2 版中的新功能。
截止日期。 这是 ISO 8601 格式的日期和时间。 如果未提供,则消息将永不过期。 当收到消息并且已超过到期日期时,该消息将过期。
taskset
- 细绳
此任务所属的组(如果有)。
chord
- 签名
2.3 版中的新功能。
表示此任务是和弦的标题部分之一。 这个键的值是当头中的所有任务都返回时应该执行的代码的主体。
utc
- 布尔值
2.5 版中的新功能。
如果真实时间使用 UTC 时区,如果不是,则应使用当前本地时区。
callbacks
- 签名
3.0 版中的新功能。
如果任务成功退出,要调用的签名列表。
errbacks
- 签名
3.0 版中的新功能。
如果在执行任务时发生错误,要调用的签名列表。
timelimit
- (浮动,浮动)
3.1 版中的新功能。
任务执行时限设置。 这是硬和软时间限制值的元组(int/float 或
None
为无限制)。指定 3 秒软时间限制和 10 秒硬时间限制的示例值:
{'timelimit': (3.0, 10.0)}
示例消息
这是 json 格式的 celery.task.ping 任务的示例调用:
{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
任务序列化
使用 content_type 消息头支持多种类型的序列化格式。
默认支持的 MIME 类型如下表所示。
方案 MIME 类型 json 应用程序/json 雅姆 应用程序/x-yaml 泡菜 应用程序/x-python-序列化 消息包 应用程序/x-msgpack
事件消息
事件消息始终是 JSON 序列化的,并且可以包含任意消息正文字段。
从 4.0 版开始。 主体可以由单个映射(一个事件)或映射列表(多个事件)组成。
还有一些标准字段必须始终出现在事件消息中:
标准正文字段
细绳
type
事件的类型。 这是一个包含 category 和 action 的字符串,由短划线分隔符(例如,
task-succeeded
)分隔。细绳
hostname
事件发生位置的完全限定主机名。
无符号长长
clock
此事件的逻辑时钟值(Lamport 时间戳)。
漂浮
timestamp
对应于事件发生时间的 UNIX 时间戳。
签名短
utcoffset
该字段描述发起主机的时区,并指定为 UTC 之前/之后的小时数(例如,-2 或 +1)。
无符号长长
pid
事件起源的进程的进程 ID。
示例消息
这是 task-succeeded
事件的消息字段:
properties = {
'routing_key': 'task.succeeded',
'exchange': 'celeryev',
'content_type': 'application/json',
'content_encoding': 'utf-8',
'delivery_mode': 1,
}
headers = {
'hostname': 'worker1@george.vandelay.com',
}
body = {
'type': 'task-succeeded',
'hostname': 'worker1@george.vandelay.com',
'pid': 6335,
'clock': 393912923921,
'timestamp': 1401717709.101747,
'utcoffset': -1,
'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
'retval': '4',
'runtime': 0.0003212,
)