路由任务 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/routing
跳转至:导航、​搜索

路由任务

笔记

主题和扇出等替代路由概念并非适用于所有传输,请参阅 传输比较表


基础知识

自动路由

进行路由的最简单方法是使用 :setting:`task_create_missing_queues` 设置(默认开启)。

启用此设置后,将自动创建尚未在 :setting:`task_queues` 中定义的命名队列。 这使得执行简单的路由任务变得容易。

假设您有两台服务器 x 和 y 处理常规任务,还有一台服务器 z 仅处理与提要相关的任务。 您可以使用此配置:

task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}

启用此路由后,导入提要任务将被路由到 “feeds” 队列,而所有其他任务将被路由到默认队列(由于历史原因命名为 “celery”)。

或者,您可以使用 glob 模式匹配,甚至正则表达式来匹配 feed.tasks 命名空间中的所有任务:

app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}

如果匹配模式的顺序很重要,您应该以 items 格式指定路由器:

task_routes = ([
    ('feed.tasks.*', {'queue': 'feeds'}),
    ('web.tasks.*', {'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)

笔记

:setting:`task_routes` 设置可以是字典,也可以是路由器对象列表,因此在这种情况下,我们需要将设置指定为包含列表的元组。


安装路由器后,您可以启动服务器 z 只处理提要队列,如下所示:

user@z:/$ celery -A proj worker -Q feeds

您可以根据需要指定任意数量的队列,因此您也可以使此服务器处理默认队列:

user@z:/$ celery -A proj worker -Q feeds,celery

更改默认队列的名称

您可以使用以下配置更改默认队列的名称:

app.conf.task_default_queue = 'default'

队列是如何定义的

此功能的重点是为只有基本需求的用户隐藏复杂的 AMQP 协议。 但是,您可能仍然对这些队列的声明方式感兴趣。

将使用以下设置创建名为 “video” 的队列:

{'exchange': 'video',
 'exchange_type': 'direct',
 'routing_key': 'video'}

非 AMQP 后端,如 Redis 或 SQS 不支持交换,因此它们要求交换与队列具有相同的名称。 使用这种设计确保它也适用于他们。


手动路由

假设您有两台服务器 x 和 y 处理常规任务,以及一台服务器 z 仅处理与提要相关的任务,您可以使用此配置:

from kombu import Queue

app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'

:setting:`task_queues`Queue 实例的列表。 如果您没有为密钥设置交换或交换类型值,这些值将从 :setting:`task_default_exchange`:setting:`task_default_exchange_type` 设置中获取。

要将任务路由到 feed_tasks 队列,您可以在 :setting:`task_routes` 设置中添加一个条目:

task_routes = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

您还可以使用 Task.apply_async()send_task() 的 routing_key 参数覆盖它:

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

要使服务器 z 从提要队列中独占使用,您可以使用 celery worker -Q 选项启动它:

user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h

服务器 x 和 y 必须配置为从默认队列消费:

user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

如果你愿意,你甚至可以让你的饲料处理工人处理常规任务,也许在有很多工作要做的时候:

user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h

如果您有另一个队列,但要添加到另一个交换机上,只需指定自定义交换机和交换机类型:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
)

如果您对这些术语感到困惑,您应该阅读 AMQP。

也可以看看

除了下面的 Redis 消息优先级 ,还有 Rabbits 和 Warrens,这是一篇描述队列和交换的优秀博客文章。 还有 CloudAMQP 教程 ,对于 RabbitMQ 用户,RabbitMQ FAQ 可以作为有用的信息来源。


特殊路由选项

RabbitMQ 消息优先级

支持的传输
兔MQ

4.0 版中的新功能。


通过设置 x-max-priority 参数,可以将队列配置为支持优先级:

from kombu import Exchange, Queue

app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10}),
]

可以使用 :setting:`task_queue_max_priority` 设置来设置所有队列的默认值:

app.conf.task_queue_max_priority = 10

也可以使用 :setting:`task_default_priority` 设置指定所有任务的默认优先级:

app.conf.task_default_priority = 5

Redis 消息优先级

支持的传输
Redis

虽然 Celery Redis 传输确实尊重优先级字段,但 Redis 本身没有优先级的概念。 请在尝试使用 Redis 实现优先级之前阅读此说明,因为您可能会遇到一些意外行为。

要根据优先级开始调度任务,您需要配置 queue_order_strategy 传输选项。

app.conf.broker_transport_options = {
    'queue_order_strategy': 'priority',
}

通过为每个队列创建 n 个列表来实现优先级支持。 这意味着即使有 10 (0-9) 个优先级,默认情况下它们也被合并为 4 个级别以节省资源。 这意味着一个名为 celery 的队列实际上会被分成 4 个队列。

最高优先级的队列将命名为 celery,其他队列将有一个分隔符(默认为 x06x16),并将它们的优先级编号附加到队列名称。

['celery', 'celery\x06\x163', 'celery\x06\x166', 'celery\x06\x169']

如果您想要更多优先级或不同的分隔符,您可以设置 priority_steps 和 sep 传输选项:

app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
    'sep': ':',
    'queue_order_strategy': 'priority',
}

上面的配置会给你这些队列名称:

['celery', 'celery:1', 'celery:2', 'celery:3', 'celery:4', 'celery:5', 'celery:6', 'celery:7', 'celery:8', 'celery:9']

也就是说,请注意,这永远不会像在服务器级别实施的优先级那样好,最多可能是近似的。 但它对于您的应用程序来说可能仍然足够好。


AMQP 入门

留言

消息由标题和正文组成。 Celery 使用标头来存储消息的内容类型及其内容编码。 内容类型通常是用于序列化消息的序列化格式。 正文包含要执行的任务的名称、任务 ID (UUID)、应用它的参数以及一些额外的元数据——比如重试次数或 ETA。

这是一个表示为 Python 字典的示例任务消息:

{'task': 'myapp.tasks.add',
 'id': '54086c5e-6193-4575-8308-dbab76798756',
 'args': [4, 4],
 'kwargs': {}}

生产者、消费者和经纪人

发送消息的客户端通常称为发布者,或生产者,而接收消息的实体称为消费者

broker 是消息服务器,将消息从生产者路由到消费者。

您可能会在 AMQP 相关材料中看到大量使用这些术语。


交换、队列和路由键

  1. 消息被发送到交易所。
  2. 交换机将消息路由到一个或多个队列。 存在多种交换类型,提供不同的路由方式或实现不同的消息传递场景。
  3. 消息在队列中等待,直到有人消费它。
  4. 消息被确认后从队列中删除。

发送和接收消息所需的步骤是:

  1. 创建交换
  2. 创建队列
  3. 将队列绑定到交换。

Celery 自动为 :setting:`task_queues` 中的队列创建工作所需的实体(除非队列的 auto_declare 设置设置为 False)。

这是具有三个队列的示例队列配置; 一个用于视频,一个用于图像,一个用于其他所有内容的默认队列:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos',  Exchange('media'),   routing_key='media.video'),
    Queue('images',  Exchange('media'),   routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'

交易所类型

交换类型定义了消息如何通过交换进行路由。 标准中定义的交换类型有direct、topic、fanout和headers。 非标准交换类型也可用作 RabbitMQ 的插件,例如 Michael Bridgen 的 last-value-cache 插件

直接交流

直接交换通过精确的路由键匹配,因此由路由键 video 绑定的队列仅接收具有该路由键的消息。


话题交流

主题交换使用点分隔的单词和通配符匹配路由键:*(匹配单个单词)和 #(匹配零个或多个单词)。

使用 usa.newsusa.weathernorway.newsnorway.weather 等路由键,绑定可以是 *.news(所有新闻)、usa.#(美国的所有项目)或 usa.weather(美国的所有天气项目)。


亲身体验 API

Celery 附带了一个名为 celery amqp 的工具,用于通过命令行访问 AMQP API,允许访问管理任务,例如创建/删除队列和交换、清除队列或发送消息。 它也可以用于非 AMQP 代理,但不同的实现可能无法实现所有命令。

您可以直接在 celery amqp 的参数中编写命令,或者直接不带参数在 shell 模式下启动它:

$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>

这里 1> 是提示。 数字 1 是到目前为止您已执行的命令数。 键入 help 以获取可用命令列表。 它还支持自动完成,因此您可以开始输入命令,然后点击 tab 键以显示可能匹配的列表。

让我们创建一个队列,您可以将消息发送到:

$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.

这创建了直接交换 testexchange 和一个名为 testqueue 的队列。 队列使用路由密钥 testkey 绑定到交换机。

从现在开始,所有发送到交换机 testexchange 的带有路由密钥 testkey 的消息都将被移动到这个队列中。 您可以使用 basic.publish 命令发送消息:

4> basic.publish 'This is a message!' testexchange testkey
ok.

现在消息已发送,您可以再次检索它。 您可以在此处使用 basic.get 命令,该命令以同步方式轮询队列中的新消息(这对于维护任务是可以的,但对于您想要使用 basic.consume 的服务)

从队列中弹出一条消息:

5> basic.get testqueue
{'body': 'This is a message!',
 'delivery_info': {'delivery_tag': 1,
                   'exchange': u'testexchange',
                   'message_count': 0,
                   'redelivered': False,
                   'routing_key': u'testkey'},
 'properties': {}}

AMQP 使用确认来表示已成功接收并处理消息。 如果消息没有被确认并且消费者通道关闭,消息将被传递给另一个消费者。

注意上面结构中列出的交货标签; 在一个连接通道内,每条接收到的消息都有一个唯一的传递标签,这个标签用于确认消息。 另请注意,传递标签在连接中不是唯一的,因此在另一个客户端中,传递标签 1 可能指向与此通道中不同的消息。

您可以使用 basic.ack 确认收到的消息:

6> basic.ack 1
ok.

要在我们的测试会话之后进行清理,您应该删除您创建的实体:

7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.

路由任务

定义队列

在 Celery 中,可用队列由 :setting:`task_queues` 设置定义。

这是具有三个队列的示例队列配置; 一个用于视频,一个用于图像,一个用于其他所有内容的默认队列:

default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

app.conf.task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

这里,:setting:`task_default_queue` 将用于路由没有明确路由的任务。

默认交换、交换类型和路由键将用作任务的默认路由值,并作为 :setting:`task_queues` 中条目的默认值。

还支持对单个队列的多个绑定。 下面是两个绑定到同一队列的路由键的示例:

from kombu import Exchange, Queue, binding

media_exchange = Exchange('media', type='direct')

CELERY_QUEUES = (
    Queue('media', [
        binding(media_exchange, routing_key='media.video'),
        binding(media_exchange, routing_key='media.image'),
    ]),
)

指定任务目的地

任务的目的地由以下(按顺序)决定:

  1. Task.apply_async() 的路由参数。
  2. Task 本身定义的路由相关属性。
  3. :setting:`task_routes` 中定义的 Routers

最好不要对这些设置进行硬编码,而是通过使用 Routers 将其作为配置选项; 这是最灵活的方法,但仍然可以将合理的默认值设置为任务属性。


路由器

路由器是决定任务的路由选项的功能。

定义一个新路由器所需的只是定义一个带有签名 (name, args, kwargs, options, task=None, **kw) 的函数:

def route_task(name, args, kwargs, options, task=None, **kw):
        if name == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}

如果您返回 queue 键,它将使用 :setting:`task_queues` 中该队列的定义设置展开:

{'queue': 'video', 'routing_key': 'video.compress'}

变成——>

{'queue': 'video',
 'exchange': 'video',
 'exchange_type': 'topic',
 'routing_key': 'video.compress'}

您可以通过将它们添加到 :setting:`task_routes` 设置来安装路由器类:

task_routes = (route_task,)

也可以按名称添加路由器功能:

task_routes = ('myapp.routers.route_task',)

对于简单的任务名称 -> 路由映射(如上面的路由器示例),您可以简单地将 dict 放入 :setting:`task_routes` 以获得相同的行为:

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
    },
}

然后将按顺序遍历路由器,它将在第一个返回真值的路由器处停止,并将其用作任务的最终路由。

您还可以按顺序定义多个路由器:

task_routes = [
    route_task,
    {
        'myapp.tasks.compress_video': {
            'queue': 'video',
            'routing_key': 'video.compress',
    },
]

然后依次访问路由器,并选择第一个返回值的路由器。

如果您使用 Redis 或 RabbitMQ,您还可以在路由中指定队列的默认优先级。

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
        'priority': 10,
    },
}

同样,在任务上调用 apply_async 将覆盖该默认优先级。

task.apply_async(priority=0)

优先顺序和集群响应

需要注意的是,由于worker prefetching,如果一堆任务同时提交,它们一开始可能会失去优先级顺序。 禁用工作程序预取将防止此问题,但可能会导致小型、快速任务的性能不理想。 在大多数情况下,简单地将 worker_prefetch_multiplier 减少到 1 是一种更简单、更简洁的方法来提高系统的响应能力,而无需完全禁用预取的成本。

请注意,在使用 redis 代理时,优先级值是反向排序的:0 是最高优先级。


广播

Celery 还可以支持广播路由。 这是一个示例交换 broadcast_tasks,它将任务副本传递给与其连接的所有工作人员:

from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
    'tasks.reload_cache': {
        'queue': 'broadcast_tasks',
        'exchange': 'broadcast_tasks'
    }
}

现在 tasks.reload_cache 任务将被发送到每个从这个队列消费的工作线程。

这是广播路由的另一个示例,这次使用 celery beat 调度:

from kombu.common import Broadcast
from celery.schedules import crontab

app.conf.task_queues = (Broadcast('broadcast_tasks'),)

app.conf.beat_schedule = {
    'test-task': {
        'task': 'tasks.reload_cache',
        'schedule': crontab(minute=0, hour='*/3'),
        'options': {'exchange': 'broadcast_tasks'}
    },
}

广播和结果

请注意,如果两个任务具有相同的 task_id,Celery 结果并没有定义会发生什么。 如果将同一个任务分配给多个工人,则可能无法保留状态历史记录。

在这种情况下设置 task.ignore_result 属性是个好主意。