信号 — Python 文档
信号
信号允许解耦的应用程序在应用程序的其他地方发生某些操作时接收通知。
Celery 附带了许多信号,您的应用程序可以将这些信号挂钩以增强某些操作的行为。
基础知识
几种事件触发信号,您可以连接到这些信号以在它们触发时执行操作。
连接到 :signal:`after_task_publish` 信号的示例:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
有些信号还有一个可以过滤的发送者。 例如 :signal:`after_task_publish` 信号使用任务名称作为发送者,因此通过向 connect
提供 sender
参数,您可以连接要调用的处理程序每次发布名为 “proj.tasks.add” 的任务时:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
信号使用与 django.core.dispatch
相同的实现。 因此,默认情况下,其他关键字参数(例如,信号)将传递给所有信号处理程序。
信号处理程序的最佳实践是接受任意关键字参数(即 **kwargs
)。 这样,新的 Celery 版本可以在不破坏用户代码的情况下添加额外的参数。
信号
任务信号
before_task_publish
3.1 版中的新功能。
在发布任务之前调度。 请注意,这是在发送任务的过程中执行的。
Sender 是正在发送的任务的名称。
提供参数:
body
exchange
要发送到的交换的名称或
Exchange
对象。routing_key
发送消息时使用的路由键。
headers
应用程序头映射(可以修改)。
properties
消息属性(可修改)
declare
在发布消息之前要声明的实体列表(
Exchange
、Queue
或binding
。 可以修改。retry_policy
重试选项的映射。 可以是
kombu.Connection.ensure()
的任何参数并且可以修改。
after_task_publish
当任务已发送到代理时调度。 请注意,这是在发送任务的进程中执行的。
Sender 是正在发送的任务的名称。
提供参数:
headers
body
exchange
交换的名称或使用的
Exchange
对象。routing_key
使用的路由密钥。
task_prerun
在执行任务之前调度。
Sender 是正在执行的任务对象。
提供参数:
task_id
要执行的任务的 ID。
task
正在执行的任务。
args
任务位置参数。
kwargs
任务关键字参数。
task_postrun
在执行任务后调度。
Sender 是执行的任务对象。
提供参数:
task_id
要执行的任务的 ID。
task
正在执行的任务。
args
任务位置参数。
kwargs
任务关键字参数。
retval
任务的返回值。
state
结果状态的名称。
task_retry
在重试任务时调度。
发送者是任务对象。
提供参数:
request
当前任务请求。
reason
重试原因(通常是异常实例,但总是可以强制为
str
)。einfo
详细的异常信息,包括回溯(
billiard.einfo.ExceptionInfo
对象)。
task_success
任务成功时调度。
Sender 是执行的任务对象。
提供参数
- *;
result
- 任务的返回值。
task_failure
在任务失败时调度。
Sender 是执行的任务对象。
提供参数:
task_id
任务ID。
exception
引发异常实例。
args
调用任务时使用的位置参数。
kwargs
调用任务时使用的关键字参数。
traceback
堆栈跟踪对象。
einfo
billiard.einfo.ExceptionInfo
实例。
task_internal_error
在执行任务时发生内部 Celery 错误时调度。
Sender 是执行的任务对象。
提供参数:
task_id
任务ID。
args
调用任务时使用的位置参数。
kwargs
调用任务时使用的关键字参数。
request
原始请求字典。 这是因为在引发异常时
task.request
可能还没有准备好。exception
引发异常实例。
traceback
堆栈跟踪对象。
einfo
billiard.einfo.ExceptionInfo
实例。
task_received
当从代理接收到任务并准备好执行时分派。
发件人是消费者对象。
提供参数:
request
这是一个
Request
实例,而不是task.request
。 使用 prefork 池时,此信号在父进程中分派,因此task.request
不可用且不应使用。 请改用此对象,因为它们共享许多相同的字段。
task_revoked
当工作人员撤销/终止任务时调度。
Sender 是被撤销/终止的任务对象。
提供参数:
request
这是一个
Request
实例,而不是task.request
。 使用 prefork 池时,此信号在父进程中分派,因此task.request
不可用且不应使用。 请改用此对象,因为它们共享许多相同的字段。terminated
如果任务终止,则设置为
True
。signum
用于终止任务的信号编号。 如果这是
None
并终止是True
那么应该假设 :sig:`TERM`。expired
如果任务过期,则设置为
True
。
task_unknown
当工作人员收到未注册任务的消息时调度。
发件人是工人 Consumer
。
提供参数:
name
在注册表中找不到任务的名称。
id
在消息中找到的任务 ID。
message
原始消息对象。
exc
发生的错误。
task_rejected
当工作器接收到未知类型的消息到其任务队列之一时调度。
发件人是工人 Consumer
。
提供参数:
message
原始消息对象。
exc
发生的错误(如果有)。
应用信号
import_modules
当程序(worker、beat、shell)等要求导入 :setting:`include` 和 :setting:`imports` 设置中的模块时发送此信号.
发件人是应用程序实例。
工人信号
celeryd_after_setup
这个信号是在 worker 实例设置之后,但在它调用 run 之前发送的。 这意味着来自 celery worker -Q
选项的任何队列都已启用,日志记录已设置等等。
它可用于添加应始终使用的自定义队列,而不管 celery worker -Q
选项。 这是为每个工作人员设置直接队列的示例,然后这些队列可用于将任务路由到任何特定工作人员:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
instance.app.amqp.queues.select_add(queue_name)
提供参数:
sender
工人的节点名称。
instance
conf
当前应用程序的配置。
celeryd_init
这是 celery worker 启动时发送的第一个信号。 sender
是 worker 的主机名,因此该信号可用于设置 worker 特定配置:
from celery.signals import celeryd_init
@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
conf.task_default_rate_limit = '10/m'
或者要为多个工作人员设置配置,您可以在连接时省略指定发件人:
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
if sender in ('worker1@example.com', 'worker2@example.com'):
conf.task_default_rate_limit = '10/m'
if sender == 'worker3@example.com':
conf.worker_prefetch_multiplier = 0
提供参数:
sender
工人的节点名。
instance
conf
当前应用程序的配置。
options
从命令行参数(包括默认值)传递给 worker 的选项。
worker_init
在工作器启动之前调度。
worker_ready
当工人准备好接受工作时调度。
heartbeat_sent
当 Celery 发送工作人员心跳时调度。
发件人是 celery.worker.heartbeat.Heart
实例。
worker_shutting_down
在工作程序开始关闭过程时调度。
提供参数:
sig
接收到的 POSIX 信号。
how
关机方法,暖或冷。
exitcode
主进程退出时将使用的退出代码。
worker_process_init
在所有池子进程启动时分派。
请注意,附加到此信号的处理程序不得阻塞超过 4 秒,否则该进程将在假设启动失败时被终止。
worker_process_shutdown
在所有池子进程退出之前分派。
注意:不能保证这个信号会被分派,类似于 finally
块,不能保证在关闭时调用处理程序,如果调用它可能会被中断。
提供参数:
pid
即将关闭的子进程的 pid。
exitcode
子进程退出时将使用的退出代码。
worker_shutdown
在工作器即将关闭时调度。
节拍信号
beat_init
在 celery beat 启动(独立或嵌入)时调度。
发件人是 celery.beat.Service
实例。
beat_embedded_init
当 celery beat 作为嵌入式进程启动时,除了 :signal:`beat_init` 信号外,还会分派。
发件人是 celery.beat.Service
实例。
Eventlet 信号
eventlet_pool_started
当 eventlet 池启动时发送。
发件人是 celery.concurrency.eventlet.TaskPool
实例。
eventlet_pool_preshutdown
在 worker 关闭时发送,就在 eventlet 池被请求等待剩余的 worker 之前发送。
发件人是 celery.concurrency.eventlet.TaskPool
实例。
eventlet_pool_postshutdown
当池已加入且工作程序准备关闭时发送。
发件人是 celery.concurrency.eventlet.TaskPool
实例。
eventlet_pool_apply
每当任务应用于池时发送。
发件人是 celery.concurrency.eventlet.TaskPool
实例。
提供参数:
target
目标函数。
args
位置参数。
kwargs
关键字参数。
记录信号
setup_logging
如果此信号已连接,Celery 将不会配置记录器,因此您可以使用它来完全覆盖您自己的日志记录配置。
如果你想增加 Celery 的日志配置设置,那么你可以使用 :signal:`after_setup_logger` 和 :signal:`after_setup_task_logger` 信号。
提供参数:
loglevel
日志对象的级别。
logfile
日志文件的名称。
format
日志格式字符串。
colorize
指定日志消息是否带有颜色。
after_setup_logger
在每个全局记录器(不是任务记录器)设置后发送。 用于扩充日志记录配置。
提供参数:
logger
记录器对象。
loglevel
日志对象的级别。
logfile
日志文件的名称。
format
日志格式字符串。
colorize
指定日志消息是否带有颜色。
after_setup_task_logger
在设置每个任务记录器后发送。 用于扩充日志记录配置。
提供参数:
logger
记录器对象。
loglevel
日志对象的级别。
logfile
日志文件的名称。
format
日志格式字符串。
colorize
指定日志消息是否带有颜色。
命令信号
user_preload_options
在任何 Celery 命令行程序完成解析用户预加载选项后发送此信号。
它可用于向 celery 伞形命令添加额外的命令行参数:
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
'--monitoring', action='store_true',
help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
if options['monitoring']:
enable_monitoring()
发送者是 Command
实例,其值取决于被调用的程序(例如,对于伞命令,它将是一个 CeleryCommand
)对象)。
提供参数:
app
应用实例。
options
解析的用户预加载选项的映射(具有默认值)。
弃用的信号