路由任务 — Python 文档
路由任务
笔记
主题和扇出等替代路由概念并非适用于所有传输,请参阅 传输比较表 。
基础知识
自动路由
进行路由的最简单方法是使用 :setting:`task_create_missing_queues` 设置(默认开启)。
启用此设置后,将自动创建尚未在 :setting:`task_queues` 中定义的命名队列。 这使得执行简单的路由任务变得容易。
假设您有两台服务器 x 和 y 处理常规任务,还有一台服务器 z 仅处理与提要相关的任务。 您可以使用此配置:
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}
启用此路由后,导入提要任务将被路由到 “feeds” 队列,而所有其他任务将被路由到默认队列(由于历史原因命名为 “celery”)。
或者,您可以使用 glob 模式匹配,甚至正则表达式来匹配 feed.tasks
命名空间中的所有任务:
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}
如果匹配模式的顺序很重要,您应该以 items 格式指定路由器:
task_routes = ([
('feed.tasks.*', {'queue': 'feeds'}),
('web.tasks.*', {'queue': 'web'}),
(re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)
安装路由器后,您可以启动服务器 z 只处理提要队列,如下所示:
user@z:/$ celery -A proj worker -Q feeds
您可以根据需要指定任意数量的队列,因此您也可以使此服务器处理默认队列:
user@z:/$ celery -A proj worker -Q feeds,celery
更改默认队列的名称
您可以使用以下配置更改默认队列的名称:
app.conf.task_default_queue = 'default'
队列是如何定义的
此功能的重点是为只有基本需求的用户隐藏复杂的 AMQP 协议。 但是,您可能仍然对这些队列的声明方式感兴趣。
将使用以下设置创建名为 “video” 的队列:
{'exchange': 'video',
'exchange_type': 'direct',
'routing_key': 'video'}
非 AMQP 后端,如 Redis 或 SQS 不支持交换,因此它们要求交换与队列具有相同的名称。 使用这种设计确保它也适用于他们。
手动路由
假设您有两台服务器 x 和 y 处理常规任务,以及一台服务器 z 仅处理与提要相关的任务,您可以使用此配置:
from kombu import Queue
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'
:setting:`task_queues` 是 Queue
实例的列表。 如果您没有为密钥设置交换或交换类型值,这些值将从 :setting:`task_default_exchange` 和 :setting:`task_default_exchange_type` 设置中获取。
要将任务路由到 feed_tasks 队列,您可以在 :setting:`task_routes` 设置中添加一个条目:
task_routes = {
'feeds.tasks.import_feed': {
'queue': 'feed_tasks',
'routing_key': 'feed.import',
},
}
您还可以使用 Task.apply_async()
或 send_task()
的 routing_key 参数覆盖它:
>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
... queue='feed_tasks',
... routing_key='feed.import')
要使服务器 z 从提要队列中独占使用,您可以使用 celery worker -Q
选项启动它:
user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h
服务器 x 和 y 必须配置为从默认队列消费:
user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h
如果你愿意,你甚至可以让你的饲料处理工人处理常规任务,也许在有很多工作要做的时候:
user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h
如果您有另一个队列,但要添加到另一个交换机上,只需指定自定义交换机和交换机类型:
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('feed_tasks', routing_key='feed.#'),
Queue('regular_tasks', routing_key='task.#'),
Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'),
routing_key='image.compress'),
)
如果您对这些术语感到困惑,您应该阅读 AMQP。
也可以看看
除了下面的 Redis 消息优先级 ,还有 Rabbits 和 Warrens,这是一篇描述队列和交换的优秀博客文章。 还有 CloudAMQP 教程 ,对于 RabbitMQ 用户,RabbitMQ FAQ 可以作为有用的信息来源。
特殊路由选项
RabbitMQ 消息优先级
- 支持的传输
- 兔MQ
4.0 版中的新功能。
通过设置 x-max-priority
参数,可以将队列配置为支持优先级:
from kombu import Exchange, Queue
app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks',
queue_arguments={'x-max-priority': 10}),
]
可以使用 :setting:`task_queue_max_priority` 设置来设置所有队列的默认值:
app.conf.task_queue_max_priority = 10
也可以使用 :setting:`task_default_priority` 设置指定所有任务的默认优先级:
app.conf.task_default_priority = 5
Redis 消息优先级
- 支持的传输
- Redis
虽然 Celery Redis 传输确实尊重优先级字段,但 Redis 本身没有优先级的概念。 请在尝试使用 Redis 实现优先级之前阅读此说明,因为您可能会遇到一些意外行为。
要根据优先级开始调度任务,您需要配置 queue_order_strategy 传输选项。
app.conf.broker_transport_options = {
'queue_order_strategy': 'priority',
}
通过为每个队列创建 n 个列表来实现优先级支持。 这意味着即使有 10 (0-9) 个优先级,默认情况下它们也被合并为 4 个级别以节省资源。 这意味着一个名为 celery 的队列实际上会被分成 4 个队列。
最高优先级的队列将命名为 celery,其他队列将有一个分隔符(默认为 x06x16),并将它们的优先级编号附加到队列名称。
['celery', 'celery\x06\x163', 'celery\x06\x166', 'celery\x06\x169']
如果您想要更多优先级或不同的分隔符,您可以设置 priority_steps 和 sep 传输选项:
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'sep': ':',
'queue_order_strategy': 'priority',
}
上面的配置会给你这些队列名称:
['celery', 'celery:1', 'celery:2', 'celery:3', 'celery:4', 'celery:5', 'celery:6', 'celery:7', 'celery:8', 'celery:9']
也就是说,请注意,这永远不会像在服务器级别实施的优先级那样好,最多可能是近似的。 但它对于您的应用程序来说可能仍然足够好。
AMQP 入门
留言
消息由标题和正文组成。 Celery 使用标头来存储消息的内容类型及其内容编码。 内容类型通常是用于序列化消息的序列化格式。 正文包含要执行的任务的名称、任务 ID (UUID)、应用它的参数以及一些额外的元数据——比如重试次数或 ETA。
这是一个表示为 Python 字典的示例任务消息:
{'task': 'myapp.tasks.add',
'id': '54086c5e-6193-4575-8308-dbab76798756',
'args': [4, 4],
'kwargs': {}}
生产者、消费者和经纪人
发送消息的客户端通常称为发布者,或生产者,而接收消息的实体称为消费者。
broker 是消息服务器,将消息从生产者路由到消费者。
您可能会在 AMQP 相关材料中看到大量使用这些术语。
交换、队列和路由键
- 消息被发送到交易所。
- 交换机将消息路由到一个或多个队列。 存在多种交换类型,提供不同的路由方式或实现不同的消息传递场景。
- 消息在队列中等待,直到有人消费它。
- 消息被确认后从队列中删除。
发送和接收消息所需的步骤是:
- 创建交换
- 创建队列
- 将队列绑定到交换。
Celery 自动为 :setting:`task_queues` 中的队列创建工作所需的实体(除非队列的 auto_declare 设置设置为 False
)。
这是具有三个队列的示例队列配置; 一个用于视频,一个用于图像,一个用于其他所有内容的默认队列:
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('videos', Exchange('media'), routing_key='media.video'),
Queue('images', Exchange('media'), routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'
交易所类型
交换类型定义了消息如何通过交换进行路由。 标准中定义的交换类型有direct、topic、fanout和headers。 非标准交换类型也可用作 RabbitMQ 的插件,例如 Michael Bridgen 的 last-value-cache 插件 。
直接交流
直接交换通过精确的路由键匹配,因此由路由键 video 绑定的队列仅接收具有该路由键的消息。
话题交流
主题交换使用点分隔的单词和通配符匹配路由键:*
(匹配单个单词)和 #
(匹配零个或多个单词)。
使用 usa.news
、usa.weather
、norway.news
和 norway.weather
等路由键,绑定可以是 *.news
(所有新闻)、usa.#
(美国的所有项目)或 usa.weather
(美国的所有天气项目)。
亲身体验 API
Celery 附带了一个名为 celery amqp 的工具,用于通过命令行访问 AMQP API,允许访问管理任务,例如创建/删除队列和交换、清除队列或发送消息。 它也可以用于非 AMQP 代理,但不同的实现可能无法实现所有命令。
您可以直接在 celery amqp 的参数中编写命令,或者直接不带参数在 shell 模式下启动它:
$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>
这里 1>
是提示。 数字 1 是到目前为止您已执行的命令数。 键入 help
以获取可用命令列表。 它还支持自动完成,因此您可以开始输入命令,然后点击 tab 键以显示可能匹配的列表。
让我们创建一个队列,您可以将消息发送到:
$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.
这创建了直接交换 testexchange
和一个名为 testqueue
的队列。 队列使用路由密钥 testkey
绑定到交换机。
从现在开始,所有发送到交换机 testexchange
的带有路由密钥 testkey
的消息都将被移动到这个队列中。 您可以使用 basic.publish
命令发送消息:
4> basic.publish 'This is a message!' testexchange testkey
ok.
现在消息已发送,您可以再次检索它。 您可以在此处使用 basic.get
命令,该命令以同步方式轮询队列中的新消息(这对于维护任务是可以的,但对于您想要使用 basic.consume
的服务)
从队列中弹出一条消息:
5> basic.get testqueue
{'body': 'This is a message!',
'delivery_info': {'delivery_tag': 1,
'exchange': u'testexchange',
'message_count': 0,
'redelivered': False,
'routing_key': u'testkey'},
'properties': {}}
AMQP 使用确认来表示已成功接收并处理消息。 如果消息没有被确认并且消费者通道关闭,消息将被传递给另一个消费者。
注意上面结构中列出的交货标签; 在一个连接通道内,每条接收到的消息都有一个唯一的传递标签,这个标签用于确认消息。 另请注意,传递标签在连接中不是唯一的,因此在另一个客户端中,传递标签 1 可能指向与此通道中不同的消息。
您可以使用 basic.ack
确认收到的消息:
6> basic.ack 1
ok.
要在我们的测试会话之后进行清理,您应该删除您创建的实体:
7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.
路由任务
定义队列
在 Celery 中,可用队列由 :setting:`task_queues` 设置定义。
这是具有三个队列的示例队列配置; 一个用于视频,一个用于图像,一个用于其他所有内容的默认队列:
default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')
app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
这里,:setting:`task_default_queue` 将用于路由没有明确路由的任务。
默认交换、交换类型和路由键将用作任务的默认路由值,并作为 :setting:`task_queues` 中条目的默认值。
还支持对单个队列的多个绑定。 下面是两个绑定到同一队列的路由键的示例:
from kombu import Exchange, Queue, binding
media_exchange = Exchange('media', type='direct')
CELERY_QUEUES = (
Queue('media', [
binding(media_exchange, routing_key='media.video'),
binding(media_exchange, routing_key='media.image'),
]),
)
指定任务目的地
任务的目的地由以下(按顺序)决定:
Task.apply_async()
的路由参数。Task
本身定义的路由相关属性。- :setting:`task_routes` 中定义的 Routers。
最好不要对这些设置进行硬编码,而是通过使用 Routers 将其作为配置选项; 这是最灵活的方法,但仍然可以将合理的默认值设置为任务属性。
路由器
路由器是决定任务的路由选项的功能。
定义一个新路由器所需的只是定义一个带有签名 (name, args, kwargs, options, task=None, **kw)
的函数:
def route_task(name, args, kwargs, options, task=None, **kw):
if name == 'myapp.tasks.compress_video':
return {'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
如果您返回 queue
键,它将使用 :setting:`task_queues` 中该队列的定义设置展开:
{'queue': 'video', 'routing_key': 'video.compress'}
变成——>
{'queue': 'video',
'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
您可以通过将它们添加到 :setting:`task_routes` 设置来安装路由器类:
task_routes = (route_task,)
也可以按名称添加路由器功能:
task_routes = ('myapp.routers.route_task',)
对于简单的任务名称 -> 路由映射(如上面的路由器示例),您可以简单地将 dict 放入 :setting:`task_routes` 以获得相同的行为:
task_routes = {
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
},
}
然后将按顺序遍历路由器,它将在第一个返回真值的路由器处停止,并将其用作任务的最终路由。
您还可以按顺序定义多个路由器:
task_routes = [
route_task,
{
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
},
]
然后依次访问路由器,并选择第一个返回值的路由器。
如果您使用 Redis 或 RabbitMQ,您还可以在路由中指定队列的默认优先级。
task_routes = {
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
'priority': 10,
},
}
同样,在任务上调用 apply_async 将覆盖该默认优先级。
task.apply_async(priority=0)
优先顺序和集群响应
需要注意的是,由于worker prefetching,如果一堆任务同时提交,它们一开始可能会失去优先级顺序。 禁用工作程序预取将防止此问题,但可能会导致小型、快速任务的性能不理想。 在大多数情况下,简单地将 worker_prefetch_multiplier 减少到 1 是一种更简单、更简洁的方法来提高系统的响应能力,而无需完全禁用预取的成本。
请注意,在使用 redis 代理时,优先级值是反向排序的:0 是最高优先级。
广播
Celery 还可以支持广播路由。 这是一个示例交换 broadcast_tasks
,它将任务副本传递给与其连接的所有工作人员:
from kombu.common import Broadcast
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
'tasks.reload_cache': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks'
}
}
现在 tasks.reload_cache
任务将被发送到每个从这个队列消费的工作线程。
这是广播路由的另一个示例,这次使用 celery beat 调度:
from kombu.common import Broadcast
from celery.schedules import crontab
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.beat_schedule = {
'test-task': {
'task': 'tasks.reload_cache',
'schedule': crontab(minute=0, hour='*/3'),
'options': {'exchange': 'broadcast_tasks'}
},
}
广播和结果
请注意,如果两个任务具有相同的 task_id,Celery 结果并没有定义会发生什么。 如果将同一个任务分配给多个工人,则可能无法保留状态历史记录。
在这种情况下设置 task.ignore_result
属性是个好主意。