扩展和引导步骤 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/extending
跳转至:导航、​搜索

扩展和引导步骤

自定义消息消费者

您可能希望嵌入自定义 Kombu 使用者以手动处理您的消息。

为此,存在一个特殊的 ConsumerStep bootstep 类,您只需定义 get_consumers 方法,该方法必须返回 kombu.Consumer 对象列表,以便在建立连接时启动:

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

def send_me_a_message(who, producer=None):
    with app.producer_or_acquire(producer) as producer:
        producer.publish(
            {'hello': who},
            serializer='json',
            exchange=my_queue.exchange,
            routing_key='routing_key',
            declare=[my_queue],
            retry=True,
        )

if __name__ == '__main__':
    send_me_a_message('world!')

笔记

Kombu 消费者可以使用两种不同的消息回调调度机制。 第一个是 callbacks 参数,它接受带有 (body, message) 签名的回调列表,第二个是 on_message 参数,它接受一个带有 (message,) 签名。 后者不会自动解码和反序列化有效负载。

def get_consumers(self, channel):
    return [Consumer(channel, queues=[my_queue],
                     on_message=self.on_message)]


def on_message(self, message):
    payload = message.decode()
    print(
        'Received message: {0!r} {props!r} rawlen={s}'.format(
        payload, props=message.properties, s=len(message.body),
    ))
    message.ack()

蓝图

Bootsteps 是一种为工作程序添加功能的技术。 bootstep 是一个自定义类,它定义钩子以在工作程序的不同阶段执行自定义操作。 每个 bootstep 都属于一个蓝图,worker 当前定义了两个蓝图:WorkerConsumer



图 A: Worker 和 Consumer 蓝图中的引导步骤。 开始
自下而上,Worker 蓝图中的第一步是 Timer,最后一步是启动 Consumer 蓝图,然后建立代理连接并开始消费消息。

thumb|none




工人

Worker 是第一个启动的蓝图,它启动主要组件,如事件循环、处理池以及用于 ETA 任务和其他定时事件的计时器。

当工作器完全启动时,它继续使用消费者蓝图,该蓝图设置如何执行任务、连接到代理并启动消息消费者。

WorkController 是核心 worker 实现,包含几个可以在引导步骤中使用的方法和属性。

属性

app
当前应用实例。

hostname
工作节点名称(例如,worker1@example.com)

blueprint
这是工人 Blueprint

hub

事件循环对象 (Hub)。 您可以使用它在事件循环中注册回调。

这仅受启用异步 I/O 的传输(amqp、redis)支持,在这种情况下,应设置 worker.use_eventloop 属性。

您的工作程序引导步骤必须要求集线器引导步骤才能使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}

pool

当前进程/eventlet/gevent/线程池。 参见 celery.concurrency.base.BasePool

您的工作程序引导步骤必须需要池引导步骤才能使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}

timer

Timer 用于调度函数。

您的工作程序引导步骤必须需要计时器引导步骤才能使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}

statedb

Database <celery.worker.state.Persistent>` 在 worker 重启之间保持状态。

这仅在启用 statedb 参数时定义。

您的工作程序引导步骤必须需要 Statedb 引导步骤才能使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Statedb'}

autoscaler

Autoscaler 用于自动增加和收缩池中的进程数。

这仅在启用 autoscale 参数时定义。

您的工作程序引导步骤必须需要 Autoscaler 引导步骤才能使用:

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoscaler:Autoscaler',)

autoreloader

Autoreloader 用于在文件系统更改时自动重新加载使用代码。

这仅在启用 autoreload 参数时定义。 您的工作程序引导步骤必须需要 Autoreloader 引导步骤才能使用它;

class WorkerStep(bootsteps.StartStopStep):
    requires = ('celery.worker.autoreloader:Autoreloader',)


示例工作程序引导步骤

Worker bootstep 示例可能是:

from celery import bootsteps

class ExampleWorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Pool'}

    def __init__(self, worker, **kwargs):
        print('Called when the WorkController instance is constructed')
        print('Arguments to WorkController: {0!r}'.format(kwargs))

    def create(self, worker):
        # this method can be used to delegate the action methods
        # to another object that implements ``start`` and ``stop``.
        return self

    def start(self, worker):
        print('Called when the worker is started.')

    def stop(self, worker):
        print('Called when the worker shuts down.')

    def terminate(self, worker):
        print('Called when the worker terminates')

每个方法都将当前的 WorkController 实例作为第一个参数传递。

另一个例子可以使用定时器定期唤醒:

from celery import bootsteps


class DeadlockDetection(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer'}

    def __init__(self, worker, deadlock_timeout=3600):
        self.timeout = deadlock_timeout
        self.requests = []
        self.tref = None

    def start(self, worker):
        # run every 30 seconds.
        self.tref = worker.timer.call_repeatedly(
            30.0, self.detect, (worker,), priority=10,
        )

    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None

    def detect(self, worker):
        # update active requests
        for req in worker.active_requests:
            if req.time_start and time() - req.time_start > self.timeout:
                raise SystemExit()

自定义任务处理日志

Celery 工作器向 Python 日志子系统发送消息,用于任务整个生命周期中的各种事件。 这些消息可以通过覆盖 celery/app/trace.py 中定义的 LOG_<TYPE> 格式字符串来定制。 例如:

import celery.app.trace

celery.app.trace.LOG_SUCCESS = "This is a custom message"

各种格式字符串都提供了用于 % 格式的任务名称和 ID,其中一些接收额外的字段,如返回值或导致任务失败的异常。 这些字段可用于自定义格式字符串,如下所示:

import celery.app.trace

celery.app.trace.LOG_REJECTED = "%(name)r is cursed and I won't run it: %(exc)s"

消费者

消费者蓝图与代理建立连接,并在每次连接丢失时重新启动。 消费者引导步骤包括工作人员心跳、远程控制命令消费者,以及重要的任务消费者。

创建消费者引导步骤时,您必须考虑必须可以重新启动蓝图。 为消费者引导步骤定义了一个额外的“关闭”方法,当工作程序关闭时调用此方法。

属性

app
当前应用实例。

controller
创建此使用者的父对象 @WorkController

hostname
工作节点名称(例如,worker1@example.com)

blueprint
这是工人 Blueprint

hub

事件循环对象 (Hub)。 您可以使用它在事件循环中注册回调。

这仅受启用异步 I/O 的传输(amqp、redis)支持,在这种情况下,应设置 worker.use_eventloop 属性。

您的工作程序引导步骤必须要求集线器引导步骤才能使用它:

class WorkerStep(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Hub'}

connection

当前代理连接 (kombu.Connection)。

消费者引导步骤必须需要“连接”引导步骤才能使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.connection:Connection'}

event_dispatcher

可用于发送事件的 @events.Dispatcher 对象。

消费者引导步骤必须需要 Events 引导步骤才能使用它。

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.events:Events'}

gossip

工人到工人广播通信 (Gossip)。

消费者引导步骤必须需要 Gossip 引导步骤才能使用它。

class RatelimitStep(bootsteps.StartStopStep):
    """Rate limit tasks based on the number of workers in the
    cluster."""
    requires = {'celery.worker.consumer.gossip:Gossip'}

    def start(self, c):
        self.c = c
        self.c.gossip.on.node_join.add(self.on_cluster_size_change)
        self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
        self.c.gossip.on.node_lost.add(self.on_node_lost)
        self.tasks = [
            self.app.tasks['proj.tasks.add']
            self.app.tasks['proj.tasks.mul']
        ]
        self.last_size = None

    def on_cluster_size_change(self, worker):
        cluster_size = len(list(self.c.gossip.state.alive_workers()))
        if cluster_size != self.last_size:
            for task in self.tasks:
                task.rate_limit = 1.0 / cluster_size
            self.c.reset_rate_limits()
            self.last_size = cluster_size

    def on_node_lost(self, worker):
        # may have processed heartbeat too late, so wake up soon
        # in order to see if the worker recovered.
        self.c.timer.call_after(10.0, self.on_cluster_size_change)

回调

  • <set> gossip.on.node_join

    每当新节点加入集群时调用,提供 Worker 实例。

  • <set> gossip.on.node_leave

    每当新节点离开集群(关闭)时调用,提供 Worker 实例。

  • <set> gossip.on.node_lost

    每当集群中的工作实例错过心跳(心跳未及时接收或处理)时调用,提供 Worker 实例。

    这并不一定意味着工作人员实际上处于离线状态,因此如果默认心跳超时不足,请使用超时机制。

pool
当前进程/eventlet/gevent/线程池。 参见 celery.concurrency.base.BasePool

timer
Timer <celery.utils.timer2.Schedule 用于调度函数。

heart

负责发送worker事件心跳(Heart)。

您的消费者引导步骤必须需要 Heart 引导步骤才能使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.heart:Heart'}

task_consumer

用于消费任务消息的 kombu.Consumer 对象。

您的消费者引导步骤必须需要 Tasks 引导步骤才能使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}

strategies

每个注册的任务类型在此映射中都有一个条目,其中的值用于执行此任务类型的传入消息(任务执行策略)。 这个映射是在消费者启动时由任务引导步骤生成的:

for name, task in app.tasks.items():
    strategies[name] = task.start_strategy(app, consumer)
    task.__trace__ = celery.app.trace.build_tracer(
        name, task, loader, hostname
    )

您的消费者引导步骤必须需要 Tasks 引导步骤才能使用它:

class Step(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}

task_buckets

A defaultdict 用于按类型查找任务的速率限制。 此字典中的条目可能是 None (无限制)或实现 consume(tokens)expected_time(tokens)TokenBucket 实例。

TokenBucket 实现了令牌桶算法,但只要符合相同的接口并定义了上述两种方法,任何算法都可以使用。

qos

QoS 对象可用于更改任务通道当前的 prefetch_count 值:

# increment at next cycle
consumer.qos.increment_eventually(1)
# decrement at next cycle
consumer.qos.decrement_eventually(1)
consumer.qos.set(10)


方法

consumer.reset_rate_limits()
更新所有已注册任务类型的 task_buckets 映射。
consumer.bucket_for_task(type, Bucket=TokenBucket)
使用 task.rate_limit 属性为任务创建速率限制桶。
consumer.add_task_queue(name, exchange=None, exchange_type=None,

routing_key=None, \*\*options):

添加要消费的新队列。 这将在连接重新启动时持续存在。
consumer.cancel_task_queue(name)
停止按名称从队列中消费。 这将在连接重新启动时持续存在。
apply_eta_task(request)
根据 request.eta 属性安排 ETA 任务执行。 (Request)


安装引导步骤

可以修改 app.steps['worker']app.steps['consumer'] 以添加新的引导步骤:

>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep)  # < add class, don't instantiate
>>> app.steps['consumer'].add(MyConsumerStep)

>>> app.steps['consumer'].update([StepA, StepB])

>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}

步骤的顺序在这里并不重要,因为顺序由生成的依赖图 (Step.requires) 决定。

为了说明如何安装引导步骤以及它们如何工作,这是一个打印一些无用调试信息的示例步骤。 它可以作为工作程序和消费者引导步骤添加:

from celery import Celery
from celery import bootsteps

class InfoStep(bootsteps.Step):

    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults, and so on.
        print('{0!r} is in init'.format(parent))

    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))

    def stop(self, parent):
        # the Consumer calls stop every time the consumer is
        # restarted (i.e., connection is lost) and also at shutdown.
        # The Worker will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))

    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))

    app = Celery(broker='amqp://')
    app.steps['worker'].add(InfoStep)
    app.steps['consumer'].add(InfoStep)

安装此步骤后启动工作程序将为我们提供以下日志:

<Worker: w@example.com (initializing)> is in init
<Consumer: w@example.com (initializing)> is in init
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <Worker: w@example.com (running)> is starting
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <Consumer: w@example.com (running)> is starting
<Consumer: w@example.com (closing)> is stopping
<Worker: w@example.com (closing)> is stopping
<Consumer: w@example.com (terminating)> is shutting down

print 语句将在 worker 初始化后重定向到日志记录子系统,因此“正在启动”行带有时间戳。 您可能会注意到这在关闭时不再发生,这是因为 stopshutdown 方法是在 信号处理程序 中调用的,并且使用日志记录是不安全的在这样的处理程序中。 使用 Python 日志记录模块进行日志记录不是 可重入 :这意味着您不能中断该函数然后稍后再次调用它。 重要的是,您编写的 stopshutdown 方法也是 可重入

使用 --loglevel=debug 启动 worker 将向我们显示有关启动过程的更多信息:

[2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at 0x101ad8410> is in init
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
    {Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
[2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
    {Connection, Mingle, Events, Gossip, InfoStep, Agent,
     Heart, Control, Tasks, event loop}
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
[2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
[2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
[2013-05-29 16:18:20,544: WARNING/MainProcess]
    <celery.apps.worker.Worker object at 0x101ad8410> is starting
[2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
[2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
[2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
[2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
[2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
[2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
[2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
[2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
[2013-05-29 16:18:21,577: WARNING/MainProcess]
    <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
[2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
[2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
[2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
[2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
[2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
[2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.

命令行程序

添加新的命令行选项

命令特定的选项

您可以通过修改应用程序实例的 @user_options 属性,向 workerbeatevents 命令添加额外的命令行选项。

Celery 命令使用 click 模块来解析命令行参数,因此要添加自定义参数,您需要将 click.Option 实例添加到相关集合中。

celery worker 命令添加自定义选项的示例:

from celery import Celery
from click import Option

app = Celery(broker='amqp://')

app.user_options['worker'].add(Option(('--enable-my-option',),
                                      is_flag=True,
                                      help='Enable custom option.'))

现在,所有引导步骤都会将此参数作为 Bootstep.__init__ 的关键字参数接收:

from celery import bootsteps

class MyBootstep(bootsteps.Step):

    def __init__(self, parent, enable_my_option=False, **options):
        super().__init__(parent, **options)
        if enable_my_option:
            party()

app.steps['worker'].add(MyBootstep)

预加载选项

celery 伞形命令支持“预加载选项”的概念。 这些是传递给所有子命令的特殊选项。

您可以添加新的预加载选项,例如指定配置模板:

from celery import Celery
from celery import signals
from click import Option

app = Celery()

app.user_options['preload'].add(Option(('-Z', '--template'),
                                       default='default',
                                       help='Configuration template to use.'))

@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
    use_template(options['template'])

添加新的 celery 子命令

可以使用 setuptools 入口点 将新命令添加到 celery 伞形命令。

入口点是特殊的元数据,可以添加到您的包 setup.py 程序中,然后在安装后使用 pkg_resources 模块从系统中读取。

Celery 识别 celery.commands 入口点以安装其他子命令,入口点的值必须指向有效的单击命令。

这是 :pypi:`Flower` 监控扩展如何添加 celery flower 命令,通过在 setup.py 中添加一个入口点:

setup(
    name='flower',
    entry_points={
        'celery.commands': [
           'flower = flower.command:flower',
        ],
    }
)

命令定义由等号分隔的两部分组成,其中第一部分是子命令(花)的名称,然后第二部分是实现该命令的函数的完全限定符号路径:

flower.command:flower

模块路径和属性的名称应该像上面一样用冒号分隔。

flower/command.py模块中,可以定义命令函数如下:

import click

@click.command()
@click.option('--port', default=8888, type=int, help='Webserver port')
@click.option('--debug', is_flag=True)
def flower(port, debug):
    print('Running our command')

工人 API

Hub - 工人异步事件循环

支持的传输
amqp,redis

3.0 版中的新功能。


当使用 amqp 或 redis 代理传输时,worker 使用异步 I/O。 最终目标是让所有传输都使用事件循环,但这需要一些时间,因此其他传输仍然使用基于线程的解决方案。

hub.add(fd, callback, flags)
hub.add_reader(fd, callback, \*args)

添加当 fd 可读时调用的回调。

回调将保持注册状态,直到使用 hub.remove(fd) 显式删除,或者文件描述符因为不再有效而被自动丢弃。

请注意,一次只能为任何给定的文件描述符注册一个回调,因此第二次调用 add 将删除以前为该文件描述符注册的任何回调。

文件描述符是任何支持 fileno 方法的类文件对象,也可以是文件描述符编号 (int)。

hub.add_writer(fd, callback, \*args)
添加当 fd 可写时调用的回调。 另请参阅上面 hub.add_reader() 的注释。
hub.remove(fd)
从循环中删除文件描述符 fd 的所有回调。


计时器 - 安排事件

timer.call_after(secs, callback, args=(), kwargs=(),

priority=0)

timer.call_repeatedly(secs, callback, args=(), kwargs=(),

priority=0)

timer.call_at(eta, callback, args=(), kwargs=(),

priority=0)