扩展和引导步骤 — Python 文档
扩展和引导步骤
自定义消息消费者
您可能希望嵌入自定义 Kombu 使用者以手动处理您的消息。
为此,存在一个特殊的 ConsumerStep
bootstep 类,您只需定义 get_consumers
方法,该方法必须返回 kombu.Consumer
对象列表,以便在建立连接时启动:
笔记
Kombu 消费者可以使用两种不同的消息回调调度机制。 第一个是 callbacks
参数,它接受带有 (body, message)
签名的回调列表,第二个是 on_message
参数,它接受一个带有 (message,)
签名。 后者不会自动解码和反序列化有效负载。
蓝图
Bootsteps 是一种为工作程序添加功能的技术。 bootstep 是一个自定义类,它定义钩子以在工作程序的不同阶段执行自定义操作。 每个 bootstep 都属于一个蓝图,worker 当前定义了两个蓝图:Worker 和 Consumer
- 图 A: Worker 和 Consumer 蓝图中的引导步骤。 开始
- 自下而上,Worker 蓝图中的第一步是 Timer,最后一步是启动 Consumer 蓝图,然后建立代理连接并开始消费消息。
工人
Worker 是第一个启动的蓝图,它启动主要组件,如事件循环、处理池以及用于 ETA 任务和其他定时事件的计时器。
当工作器完全启动时,它继续使用消费者蓝图,该蓝图设置如何执行任务、连接到代理并启动消息消费者。
WorkController
是核心 worker 实现,包含几个可以在引导步骤中使用的方法和属性。
属性
- app
- 当前应用实例。
- hostname
- 工作节点名称(例如,worker1@example.com)
- blueprint
- 这是工人
Blueprint
。
- hub
事件循环对象 (
Hub
)。 您可以使用它在事件循环中注册回调。这仅受启用异步 I/O 的传输(amqp、redis)支持,在这种情况下,应设置 worker.use_eventloop 属性。
您的工作程序引导步骤必须要求集线器引导步骤才能使用它:
- pool
当前进程/eventlet/gevent/线程池。 参见
celery.concurrency.base.BasePool
。您的工作程序引导步骤必须需要池引导步骤才能使用它:
- timer
Timer
用于调度函数。您的工作程序引导步骤必须需要计时器引导步骤才能使用它:
- statedb
Database <celery.worker.state.Persistent>`
在 worker 重启之间保持状态。这仅在启用
statedb
参数时定义。您的工作程序引导步骤必须需要
Statedb
引导步骤才能使用它:
- autoscaler
Autoscaler
用于自动增加和收缩池中的进程数。这仅在启用
autoscale
参数时定义。您的工作程序引导步骤必须需要 Autoscaler 引导步骤才能使用:
- autoreloader
Autoreloader
用于在文件系统更改时自动重新加载使用代码。这仅在启用
autoreload
参数时定义。 您的工作程序引导步骤必须需要 Autoreloader 引导步骤才能使用它;
示例工作程序引导步骤
Worker bootstep 示例可能是:
每个方法都将当前的 WorkController
实例作为第一个参数传递。
另一个例子可以使用定时器定期唤醒:
自定义任务处理日志
Celery 工作器向 Python 日志子系统发送消息,用于任务整个生命周期中的各种事件。 这些消息可以通过覆盖 celery/app/trace.py
中定义的 LOG_<TYPE>
格式字符串来定制。 例如:
各种格式字符串都提供了用于 %
格式的任务名称和 ID,其中一些接收额外的字段,如返回值或导致任务失败的异常。 这些字段可用于自定义格式字符串,如下所示:
消费者
消费者蓝图与代理建立连接,并在每次连接丢失时重新启动。 消费者引导步骤包括工作人员心跳、远程控制命令消费者,以及重要的任务消费者。
创建消费者引导步骤时,您必须考虑必须可以重新启动蓝图。 为消费者引导步骤定义了一个额外的“关闭”方法,当工作程序关闭时调用此方法。
属性
- app
- 当前应用实例。
- controller
- 创建此使用者的父对象
@WorkController
。
- hostname
- 工作节点名称(例如,worker1@example.com)
- blueprint
- 这是工人
Blueprint
。
- hub
事件循环对象 (
Hub
)。 您可以使用它在事件循环中注册回调。这仅受启用异步 I/O 的传输(amqp、redis)支持,在这种情况下,应设置 worker.use_eventloop 属性。
您的工作程序引导步骤必须要求集线器引导步骤才能使用它:
- connection
当前代理连接 (
kombu.Connection
)。消费者引导步骤必须需要“连接”引导步骤才能使用它:
- event_dispatcher
可用于发送事件的
@events.Dispatcher
对象。消费者引导步骤必须需要 Events 引导步骤才能使用它。
- gossip
工人到工人广播通信 (
Gossip
)。消费者引导步骤必须需要 Gossip 引导步骤才能使用它。
回调
<set> gossip.on.node_join
每当新节点加入集群时调用,提供
Worker
实例。<set> gossip.on.node_leave
每当新节点离开集群(关闭)时调用,提供
Worker
实例。<set> gossip.on.node_lost
每当集群中的工作实例错过心跳(心跳未及时接收或处理)时调用,提供
Worker
实例。这并不一定意味着工作人员实际上处于离线状态,因此如果默认心跳超时不足,请使用超时机制。
- pool
- 当前进程/eventlet/gevent/线程池。 参见
celery.concurrency.base.BasePool
。
- timer
Timer <celery.utils.timer2.Schedule
用于调度函数。
- heart
负责发送worker事件心跳(
Heart
)。您的消费者引导步骤必须需要 Heart 引导步骤才能使用它:
- task_consumer
用于消费任务消息的
kombu.Consumer
对象。您的消费者引导步骤必须需要 Tasks 引导步骤才能使用它:
- strategies
每个注册的任务类型在此映射中都有一个条目,其中的值用于执行此任务类型的传入消息(任务执行策略)。 这个映射是在消费者启动时由任务引导步骤生成的:
您的消费者引导步骤必须需要 Tasks 引导步骤才能使用它:
- task_buckets
A
defaultdict
用于按类型查找任务的速率限制。 此字典中的条目可能是 None (无限制)或实现consume(tokens)
和expected_time(tokens)
的TokenBucket
实例。TokenBucket 实现了令牌桶算法,但只要符合相同的接口并定义了上述两种方法,任何算法都可以使用。
- qos
QoS
对象可用于更改任务通道当前的 prefetch_count 值:
方法
- consumer.reset_rate_limits()
- 更新所有已注册任务类型的
task_buckets
映射。
- consumer.bucket_for_task(type, Bucket=TokenBucket)
- 使用
task.rate_limit
属性为任务创建速率限制桶。
- consumer.add_task_queue(name, exchange=None, exchange_type=None,
routing_key=None, \*\*options):
- 添加要消费的新队列。 这将在连接重新启动时持续存在。
- consumer.cancel_task_queue(name)
- 停止按名称从队列中消费。 这将在连接重新启动时持续存在。
- apply_eta_task(request)
- 根据
request.eta
属性安排 ETA 任务执行。 (Request
)
安装引导步骤
可以修改 app.steps['worker']
和 app.steps['consumer']
以添加新的引导步骤:
步骤的顺序在这里并不重要,因为顺序由生成的依赖图 (Step.requires
) 决定。
为了说明如何安装引导步骤以及它们如何工作,这是一个打印一些无用调试信息的示例步骤。 它可以作为工作程序和消费者引导步骤添加:
安装此步骤后启动工作程序将为我们提供以下日志:
print
语句将在 worker 初始化后重定向到日志记录子系统,因此“正在启动”行带有时间戳。 您可能会注意到这在关闭时不再发生,这是因为 stop
和 shutdown
方法是在 信号处理程序 中调用的,并且使用日志记录是不安全的在这样的处理程序中。 使用 Python 日志记录模块进行日志记录不是 可重入 :这意味着您不能中断该函数然后稍后再次调用它。 重要的是,您编写的 stop
和 shutdown
方法也是 可重入 。
使用 --loglevel=debug
启动 worker 将向我们显示有关启动过程的更多信息:
命令行程序
添加新的命令行选项
命令特定的选项
您可以通过修改应用程序实例的 @user_options
属性,向 worker
、beat
和 events
命令添加额外的命令行选项。
Celery 命令使用 click
模块来解析命令行参数,因此要添加自定义参数,您需要将 click.Option
实例添加到相关集合中。
向 celery worker 命令添加自定义选项的示例:
现在,所有引导步骤都会将此参数作为 Bootstep.__init__
的关键字参数接收:
预加载选项
celery 伞形命令支持“预加载选项”的概念。 这些是传递给所有子命令的特殊选项。
您可以添加新的预加载选项,例如指定配置模板:
添加新的 celery 子命令
可以使用 setuptools 入口点 将新命令添加到 celery 伞形命令。
入口点是特殊的元数据,可以添加到您的包 setup.py
程序中,然后在安装后使用 pkg_resources
模块从系统中读取。
Celery 识别 celery.commands
入口点以安装其他子命令,入口点的值必须指向有效的单击命令。
这是 :pypi:`Flower` 监控扩展如何添加 celery flower 命令,通过在 setup.py
中添加一个入口点:
命令定义由等号分隔的两部分组成,其中第一部分是子命令(花)的名称,然后第二部分是实现该命令的函数的完全限定符号路径:
模块路径和属性的名称应该像上面一样用冒号分隔。
在flower/command.py
模块中,可以定义命令函数如下:
工人 API
Hub - 工人异步事件循环
- 支持的传输
- amqp,redis
3.0 版中的新功能。
当使用 amqp 或 redis 代理传输时,worker 使用异步 I/O。 最终目标是让所有传输都使用事件循环,但这需要一些时间,因此其他传输仍然使用基于线程的解决方案。
- hub.add(fd, callback, flags)
- hub.add_reader(fd, callback, \*args)
添加当
fd
可读时调用的回调。回调将保持注册状态,直到使用 hub.remove(fd) 显式删除,或者文件描述符因为不再有效而被自动丢弃。
请注意,一次只能为任何给定的文件描述符注册一个回调,因此第二次调用
add
将删除以前为该文件描述符注册的任何回调。文件描述符是任何支持
fileno
方法的类文件对象,也可以是文件描述符编号 (int)。
- hub.add_writer(fd, callback, \*args)
- 添加当
fd
可写时调用的回调。 另请参阅上面 hub.add_reader() 的注释。
- hub.remove(fd)
- 从循环中删除文件描述符
fd
的所有回调。
计时器 - 安排事件
- timer.call_after(secs, callback, args=(), kwargs=(),
priority=0)
- timer.call_repeatedly(secs, callback, args=(), kwargs=(),
priority=0)
- timer.call_at(eta, callback, args=(), kwargs=(),
priority=0)