扩展和引导步骤 — Python 文档
扩展和引导步骤
自定义消息消费者
您可能希望嵌入自定义 Kombu 使用者以手动处理您的消息。
为此,存在一个特殊的 ConsumerStep
bootstep 类,您只需定义 get_consumers
方法,该方法必须返回 kombu.Consumer
对象列表,以便在建立连接时启动:
from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue
my_queue = Queue('custom', Exchange('custom'), 'routing_key')
app = Celery(broker='amqp://')
class MyConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [Consumer(channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=['json'])]
def handle_message(self, body, message):
print('Received message: {0!r}'.format(body))
message.ack()
app.steps['consumer'].add(MyConsumerStep)
def send_me_a_message(who, producer=None):
with app.producer_or_acquire(producer) as producer:
producer.publish(
{'hello': who},
serializer='json',
exchange=my_queue.exchange,
routing_key='routing_key',
declare=[my_queue],
retry=True,
)
if __name__ == '__main__':
send_me_a_message('world!')
笔记
Kombu 消费者可以使用两种不同的消息回调调度机制。 第一个是 callbacks
参数,它接受带有 (body, message)
签名的回调列表,第二个是 on_message
参数,它接受一个带有 (message,)
签名。 后者不会自动解码和反序列化有效负载。
def get_consumers(self, channel):
return [Consumer(channel, queues=[my_queue],
on_message=self.on_message)]
def on_message(self, message):
payload = message.decode()
print(
'Received message: {0!r} {props!r} rawlen={s}'.format(
payload, props=message.properties, s=len(message.body),
))
message.ack()
蓝图
Bootsteps 是一种为工作程序添加功能的技术。 bootstep 是一个自定义类,它定义钩子以在工作程序的不同阶段执行自定义操作。 每个 bootstep 都属于一个蓝图,worker 当前定义了两个蓝图:Worker 和 Consumer
- 图 A: Worker 和 Consumer 蓝图中的引导步骤。 开始
- 自下而上,Worker 蓝图中的第一步是 Timer,最后一步是启动 Consumer 蓝图,然后建立代理连接并开始消费消息。
工人
Worker 是第一个启动的蓝图,它启动主要组件,如事件循环、处理池以及用于 ETA 任务和其他定时事件的计时器。
当工作器完全启动时,它继续使用消费者蓝图,该蓝图设置如何执行任务、连接到代理并启动消息消费者。
WorkController
是核心 worker 实现,包含几个可以在引导步骤中使用的方法和属性。
属性
- app
- 当前应用实例。
- hostname
- 工作节点名称(例如,worker1@example.com)
- blueprint
- 这是工人
Blueprint
。
- hub
事件循环对象 (
Hub
)。 您可以使用它在事件循环中注册回调。这仅受启用异步 I/O 的传输(amqp、redis)支持,在这种情况下,应设置 worker.use_eventloop 属性。
您的工作程序引导步骤必须要求集线器引导步骤才能使用它:
class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Hub'}
- pool
当前进程/eventlet/gevent/线程池。 参见
celery.concurrency.base.BasePool
。您的工作程序引导步骤必须需要池引导步骤才能使用它:
class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Pool'}
- timer
Timer
用于调度函数。您的工作程序引导步骤必须需要计时器引导步骤才能使用它:
class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Timer'}
- statedb
Database <celery.worker.state.Persistent>`
在 worker 重启之间保持状态。这仅在启用
statedb
参数时定义。您的工作程序引导步骤必须需要
Statedb
引导步骤才能使用它:class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Statedb'}
- autoscaler
Autoscaler
用于自动增加和收缩池中的进程数。这仅在启用
autoscale
参数时定义。您的工作程序引导步骤必须需要 Autoscaler 引导步骤才能使用:
class WorkerStep(bootsteps.StartStopStep): requires = ('celery.worker.autoscaler:Autoscaler',)
- autoreloader
Autoreloader
用于在文件系统更改时自动重新加载使用代码。这仅在启用
autoreload
参数时定义。 您的工作程序引导步骤必须需要 Autoreloader 引导步骤才能使用它;class WorkerStep(bootsteps.StartStopStep): requires = ('celery.worker.autoreloader:Autoreloader',)
示例工作程序引导步骤
Worker bootstep 示例可能是:
from celery import bootsteps
class ExampleWorkerStep(bootsteps.StartStopStep):
requires = {'celery.worker.components:Pool'}
def __init__(self, worker, **kwargs):
print('Called when the WorkController instance is constructed')
print('Arguments to WorkController: {0!r}'.format(kwargs))
def create(self, worker):
# this method can be used to delegate the action methods
# to another object that implements ``start`` and ``stop``.
return self
def start(self, worker):
print('Called when the worker is started.')
def stop(self, worker):
print('Called when the worker shuts down.')
def terminate(self, worker):
print('Called when the worker terminates')
每个方法都将当前的 WorkController
实例作为第一个参数传递。
另一个例子可以使用定时器定期唤醒:
from celery import bootsteps
class DeadlockDetection(bootsteps.StartStopStep):
requires = {'celery.worker.components:Timer'}
def __init__(self, worker, deadlock_timeout=3600):
self.timeout = deadlock_timeout
self.requests = []
self.tref = None
def start(self, worker):
# run every 30 seconds.
self.tref = worker.timer.call_repeatedly(
30.0, self.detect, (worker,), priority=10,
)
def stop(self, worker):
if self.tref:
self.tref.cancel()
self.tref = None
def detect(self, worker):
# update active requests
for req in worker.active_requests:
if req.time_start and time() - req.time_start > self.timeout:
raise SystemExit()
自定义任务处理日志
Celery 工作器向 Python 日志子系统发送消息,用于任务整个生命周期中的各种事件。 这些消息可以通过覆盖 celery/app/trace.py
中定义的 LOG_<TYPE>
格式字符串来定制。 例如:
import celery.app.trace
celery.app.trace.LOG_SUCCESS = "This is a custom message"
各种格式字符串都提供了用于 %
格式的任务名称和 ID,其中一些接收额外的字段,如返回值或导致任务失败的异常。 这些字段可用于自定义格式字符串,如下所示:
import celery.app.trace
celery.app.trace.LOG_REJECTED = "%(name)r is cursed and I won't run it: %(exc)s"
消费者
消费者蓝图与代理建立连接,并在每次连接丢失时重新启动。 消费者引导步骤包括工作人员心跳、远程控制命令消费者,以及重要的任务消费者。
创建消费者引导步骤时,您必须考虑必须可以重新启动蓝图。 为消费者引导步骤定义了一个额外的“关闭”方法,当工作程序关闭时调用此方法。
属性
- app
- 当前应用实例。
- controller
- 创建此使用者的父对象
@WorkController
。
- hostname
- 工作节点名称(例如,worker1@example.com)
- blueprint
- 这是工人
Blueprint
。
- hub
事件循环对象 (
Hub
)。 您可以使用它在事件循环中注册回调。这仅受启用异步 I/O 的传输(amqp、redis)支持,在这种情况下,应设置 worker.use_eventloop 属性。
您的工作程序引导步骤必须要求集线器引导步骤才能使用它:
class WorkerStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Hub'}
- connection
当前代理连接 (
kombu.Connection
)。消费者引导步骤必须需要“连接”引导步骤才能使用它:
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.connection:Connection'}
- event_dispatcher
可用于发送事件的
@events.Dispatcher
对象。消费者引导步骤必须需要 Events 引导步骤才能使用它。
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.events:Events'}
- gossip
工人到工人广播通信 (
Gossip
)。消费者引导步骤必须需要 Gossip 引导步骤才能使用它。
class RatelimitStep(bootsteps.StartStopStep): """Rate limit tasks based on the number of workers in the cluster.""" requires = {'celery.worker.consumer.gossip:Gossip'} def start(self, c): self.c = c self.c.gossip.on.node_join.add(self.on_cluster_size_change) self.c.gossip.on.node_leave.add(self.on_cluster_size_change) self.c.gossip.on.node_lost.add(self.on_node_lost) self.tasks = [ self.app.tasks['proj.tasks.add'] self.app.tasks['proj.tasks.mul'] ] self.last_size = None def on_cluster_size_change(self, worker): cluster_size = len(list(self.c.gossip.state.alive_workers())) if cluster_size != self.last_size: for task in self.tasks: task.rate_limit = 1.0 / cluster_size self.c.reset_rate_limits() self.last_size = cluster_size def on_node_lost(self, worker): # may have processed heartbeat too late, so wake up soon # in order to see if the worker recovered. self.c.timer.call_after(10.0, self.on_cluster_size_change)
回调
<set> gossip.on.node_join
每当新节点加入集群时调用,提供
Worker
实例。<set> gossip.on.node_leave
每当新节点离开集群(关闭)时调用,提供
Worker
实例。<set> gossip.on.node_lost
每当集群中的工作实例错过心跳(心跳未及时接收或处理)时调用,提供
Worker
实例。这并不一定意味着工作人员实际上处于离线状态,因此如果默认心跳超时不足,请使用超时机制。
- pool
- 当前进程/eventlet/gevent/线程池。 参见
celery.concurrency.base.BasePool
。
- timer
Timer <celery.utils.timer2.Schedule
用于调度函数。
- heart
负责发送worker事件心跳(
Heart
)。您的消费者引导步骤必须需要 Heart 引导步骤才能使用它:
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.heart:Heart'}
- task_consumer
用于消费任务消息的
kombu.Consumer
对象。您的消费者引导步骤必须需要 Tasks 引导步骤才能使用它:
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.tasks:Tasks'}
- strategies
每个注册的任务类型在此映射中都有一个条目,其中的值用于执行此任务类型的传入消息(任务执行策略)。 这个映射是在消费者启动时由任务引导步骤生成的:
for name, task in app.tasks.items(): strategies[name] = task.start_strategy(app, consumer) task.__trace__ = celery.app.trace.build_tracer( name, task, loader, hostname )
您的消费者引导步骤必须需要 Tasks 引导步骤才能使用它:
class Step(bootsteps.StartStopStep): requires = {'celery.worker.consumer.tasks:Tasks'}
- task_buckets
A
defaultdict
用于按类型查找任务的速率限制。 此字典中的条目可能是 None (无限制)或实现consume(tokens)
和expected_time(tokens)
的TokenBucket
实例。TokenBucket 实现了令牌桶算法,但只要符合相同的接口并定义了上述两种方法,任何算法都可以使用。
- qos
QoS
对象可用于更改任务通道当前的 prefetch_count 值:# increment at next cycle consumer.qos.increment_eventually(1) # decrement at next cycle consumer.qos.decrement_eventually(1) consumer.qos.set(10)
方法
- consumer.reset_rate_limits()
- 更新所有已注册任务类型的
task_buckets
映射。
- consumer.bucket_for_task(type, Bucket=TokenBucket)
- 使用
task.rate_limit
属性为任务创建速率限制桶。
- consumer.add_task_queue(name, exchange=None, exchange_type=None,
routing_key=None, \*\*options):
- 添加要消费的新队列。 这将在连接重新启动时持续存在。
- consumer.cancel_task_queue(name)
- 停止按名称从队列中消费。 这将在连接重新启动时持续存在。
- apply_eta_task(request)
- 根据
request.eta
属性安排 ETA 任务执行。 (Request
)
安装引导步骤
可以修改 app.steps['worker']
和 app.steps['consumer']
以添加新的引导步骤:
>>> app = Celery()
>>> app.steps['worker'].add(MyWorkerStep) # < add class, don't instantiate
>>> app.steps['consumer'].add(MyConsumerStep)
>>> app.steps['consumer'].update([StepA, StepB])
>>> app.steps['consumer']
{step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}
步骤的顺序在这里并不重要,因为顺序由生成的依赖图 (Step.requires
) 决定。
为了说明如何安装引导步骤以及它们如何工作,这是一个打印一些无用调试信息的示例步骤。 它可以作为工作程序和消费者引导步骤添加:
from celery import Celery
from celery import bootsteps
class InfoStep(bootsteps.Step):
def __init__(self, parent, **kwargs):
# here we can prepare the Worker/Consumer object
# in any way we want, set attribute defaults, and so on.
print('{0!r} is in init'.format(parent))
def start(self, parent):
# our step is started together with all other Worker/Consumer
# bootsteps.
print('{0!r} is starting'.format(parent))
def stop(self, parent):
# the Consumer calls stop every time the consumer is
# restarted (i.e., connection is lost) and also at shutdown.
# The Worker will call stop at shutdown only.
print('{0!r} is stopping'.format(parent))
def shutdown(self, parent):
# shutdown is called by the Consumer at shutdown, it's not
# called by Worker.
print('{0!r} is shutting down'.format(parent))
app = Celery(broker='amqp://')
app.steps['worker'].add(InfoStep)
app.steps['consumer'].add(InfoStep)
安装此步骤后启动工作程序将为我们提供以下日志:
<Worker: w@example.com (initializing)> is in init
<Consumer: w@example.com (initializing)> is in init
[2013-05-29 16:18:20,544: WARNING/MainProcess]
<Worker: w@example.com (running)> is starting
[2013-05-29 16:18:21,577: WARNING/MainProcess]
<Consumer: w@example.com (running)> is starting
<Consumer: w@example.com (closing)> is stopping
<Worker: w@example.com (closing)> is stopping
<Consumer: w@example.com (terminating)> is shutting down
print
语句将在 worker 初始化后重定向到日志记录子系统,因此“正在启动”行带有时间戳。 您可能会注意到这在关闭时不再发生,这是因为 stop
和 shutdown
方法是在 信号处理程序 中调用的,并且使用日志记录是不安全的在这样的处理程序中。 使用 Python 日志记录模块进行日志记录不是 可重入 :这意味着您不能中断该函数然后稍后再次调用它。 重要的是,您编写的 stop
和 shutdown
方法也是 可重入 。
使用 --loglevel=debug
启动 worker 将向我们显示有关启动过程的更多信息:
[2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
<celery.apps.worker.Worker object at 0x101ad8410> is in init
[2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
{Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
[2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
{Connection, Mingle, Events, Gossip, InfoStep, Agent,
Heart, Control, Tasks, event loop}
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
[2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
[2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
[2013-05-29 16:18:20,544: WARNING/MainProcess]
<celery.apps.worker.Worker object at 0x101ad8410> is starting
[2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
[2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
[2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
[2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
[2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
[2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
[2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
[2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
[2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
[2013-05-29 16:18:21,577: WARNING/MainProcess]
<celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
[2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
[2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
[2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
[2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
[2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
[2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.
命令行程序
添加新的命令行选项
命令特定的选项
您可以通过修改应用程序实例的 @user_options
属性,向 worker
、beat
和 events
命令添加额外的命令行选项。
Celery 命令使用 click
模块来解析命令行参数,因此要添加自定义参数,您需要将 click.Option
实例添加到相关集合中。
向 celery worker 命令添加自定义选项的示例:
from celery import Celery
from click import Option
app = Celery(broker='amqp://')
app.user_options['worker'].add(Option(('--enable-my-option',),
is_flag=True,
help='Enable custom option.'))
现在,所有引导步骤都会将此参数作为 Bootstep.__init__
的关键字参数接收:
from celery import bootsteps
class MyBootstep(bootsteps.Step):
def __init__(self, parent, enable_my_option=False, **options):
super().__init__(parent, **options)
if enable_my_option:
party()
app.steps['worker'].add(MyBootstep)
预加载选项
celery 伞形命令支持“预加载选项”的概念。 这些是传递给所有子命令的特殊选项。
您可以添加新的预加载选项,例如指定配置模板:
from celery import Celery
from celery import signals
from click import Option
app = Celery()
app.user_options['preload'].add(Option(('-Z', '--template'),
default='default',
help='Configuration template to use.'))
@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
use_template(options['template'])
添加新的 celery 子命令
可以使用 setuptools 入口点 将新命令添加到 celery 伞形命令。
入口点是特殊的元数据,可以添加到您的包 setup.py
程序中,然后在安装后使用 pkg_resources
模块从系统中读取。
Celery 识别 celery.commands
入口点以安装其他子命令,入口点的值必须指向有效的单击命令。
这是 :pypi:`Flower` 监控扩展如何添加 celery flower 命令,通过在 setup.py
中添加一个入口点:
setup(
name='flower',
entry_points={
'celery.commands': [
'flower = flower.command:flower',
],
}
)
命令定义由等号分隔的两部分组成,其中第一部分是子命令(花)的名称,然后第二部分是实现该命令的函数的完全限定符号路径:
flower.command:flower
模块路径和属性的名称应该像上面一样用冒号分隔。
在flower/command.py
模块中,可以定义命令函数如下:
import click
@click.command()
@click.option('--port', default=8888, type=int, help='Webserver port')
@click.option('--debug', is_flag=True)
def flower(port, debug):
print('Running our command')
工人 API
Hub - 工人异步事件循环
- 支持的传输
- amqp,redis
3.0 版中的新功能。
当使用 amqp 或 redis 代理传输时,worker 使用异步 I/O。 最终目标是让所有传输都使用事件循环,但这需要一些时间,因此其他传输仍然使用基于线程的解决方案。
- hub.add(fd, callback, flags)
- hub.add_reader(fd, callback, \*args)
添加当
fd
可读时调用的回调。回调将保持注册状态,直到使用 hub.remove(fd) 显式删除,或者文件描述符因为不再有效而被自动丢弃。
请注意,一次只能为任何给定的文件描述符注册一个回调,因此第二次调用
add
将删除以前为该文件描述符注册的任何回调。文件描述符是任何支持
fileno
方法的类文件对象,也可以是文件描述符编号 (int)。
- hub.add_writer(fd, callback, \*args)
- 添加当
fd
可写时调用的回调。 另请参阅上面 hub.add_reader() 的注释。
- hub.remove(fd)
- 从循环中删除文件描述符
fd
的所有回调。
计时器 - 安排事件
- timer.call_after(secs, callback, args=(), kwargs=(),
priority=0)
- timer.call_repeatedly(secs, callback, args=(), kwargs=(),
priority=0)
- timer.call_at(eta, callback, args=(), kwargs=(),
priority=0)