信号 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/signals
跳转至:导航、​搜索

信号

信号允许解耦的应用程序在应用程序的其他地方发生某些操作时接收通知。

Celery 附带了许多信号,您的应用程序可以将这些信号挂钩以增强某些操作的行为。

基础知识

几种事件触发信号,您可以连接到这些信号以在它们触发时执行操作。

连接到 :signal:`after_task_publish` 信号的示例:

from celery.signals import after_task_publish

@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

有些信号还有一个可以过滤的发送者。 例如 :signal:`after_task_publish` 信号使用任务名称作为发送者,因此通过向 connect 提供 sender 参数,您可以连接要调用的处理程序每次发布名为 “proj.tasks.add” 的任务时:

@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

信号使用与 django.core.dispatch 相同的实现。 因此,默认情况下,其他关键字参数(例如,信号)将传递给所有信号处理程序。

信号处理程序的最佳实践是接受任意关键字参数(即 **kwargs)。 这样,新的 Celery 版本可以在不破坏用户代码的情况下添加额外的参数。


信号

任务信号

before_task_publish

3.1 版中的新功能。


在发布任务之前调度。 请注意,这是在发送任务的过程中执行的。

Sender 是正在发送的任务的名称。

提供参数:

  • body

    任务消息正文。

    这是一个包含任务消息字段的映射,请参阅 版本 2版本 1 以获取可定义的可能字段的参考。

  • exchange

    要发送到的交换的名称或 Exchange 对象。

  • routing_key

    发送消息时使用的路由键。

  • headers

    应用程序头映射(可以修改)。

  • properties

    消息属性(可修改)

  • declare

    在发布消息之前要声明的实体列表(ExchangeQueuebinding。 可以修改。

  • retry_policy

    重试选项的映射。 可以是 kombu.Connection.ensure() 的任何参数并且可以修改。


after_task_publish

当任务已发送到代理时调度。 请注意,这是在发送任务的进程中执行的。

Sender 是正在发送的任务的名称。

提供参数:

  • headers

    任务消息头,请参阅 Version 2Version 1 以获取可定义的可能字段的参考。

  • body

    任务消息正文,请参阅 Version 2Version 1 以获取可定义的可能字段的参考。

  • exchange

    交换的名称或使用的 Exchange 对象。

  • routing_key

    使用的路由密钥。


task_prerun

在执行任务之前调度。

Sender 是正在执行的任务对象。

提供参数:

  • task_id

    要执行的任务的 ID。

  • task

    正在执行的任务。

  • args

    任务位置参数。

  • kwargs

    任务关键字参数。


task_postrun

在执行任务后调度。

Sender 是执行的任务对象。

提供参数:

  • task_id

    要执行的任务的 ID。

  • task

    正在执行的任务。

  • args

    任务位置参数。

  • kwargs

    任务关键字参数。

  • retval

    任务的返回值。

  • state

    结果状态的名称。


task_retry

在重试任务时调度。

发送者是任务对象。

提供参数:

  • request

    当前任务请求。

  • reason

    重试原因(通常是异常实例,但总是可以强制为 str)。

  • einfo

    详细的异常信息,包括回溯(billiard.einfo.ExceptionInfo 对象)。


task_success

任务成功时调度。

Sender 是执行的任务对象。

提供参数

  • *; result
    任务的返回值。


task_failure

在任务失败时调度。

Sender 是执行的任务对象。

提供参数:

  • task_id

    任务ID。

  • exception

    引发异常实例。

  • args

    调用任务时使用的位置参数。

  • kwargs

    调用任务时使用的关键字参数。

  • traceback

    堆栈跟踪对象。

  • einfo

    billiard.einfo.ExceptionInfo 实例。


task_internal_error

在执行任务时发生内部 Celery 错误时调度。

Sender 是执行的任务对象。

提供参数:

  • task_id

    任务ID。

  • args

    调用任务时使用的位置参数。

  • kwargs

    调用任务时使用的关键字参数。

  • request

    原始请求字典。 这是因为在引发异常时 task.request 可能还没有准备好。

  • exception

    引发异常实例。

  • traceback

    堆栈跟踪对象。

  • einfo

    billiard.einfo.ExceptionInfo 实例。


task_received

当从代理接收到任务并准备好执行时分派。

发件人是消费者对象。

提供参数:

  • request

    这是一个 Request 实例,而不是 task.request。 使用 prefork 池时,此信号在父进程中分派,因此 task.request 不可用且不应使用。 请改用此对象,因为它们共享许多相同的字段。


task_revoked

当工作人员撤销/终止任务时调度。

Sender 是被撤销/终止的任务对象。

提供参数:

  • request

    这是一个 Request 实例,而不是 task.request。 使用 prefork 池时,此信号在父进程中分派,因此 task.request 不可用且不应使用。 请改用此对象,因为它们共享许多相同的字段。

  • terminated

    如果任务终止,则设置为 True

  • signum

    用于终止任务的信号编号。 如果这是 None 并终止是 True 那么应该假设 :sig:`TERM`

  • expired

    如果任务过期,则设置为 True


task_unknown

当工作人员收到未注册任务的消息时调度。

发件人是工人 Consumer

提供参数:

  • name

    在注册表中找不到任务的名称。

  • id

    在消息中找到的任务 ID。

  • message

    原始消息对象。

  • exc

    发生的错误。


task_rejected

当工作器接收到未知类型的消息到其任务队列之一时调度。

发件人是工人 Consumer

提供参数:

  • message

    原始消息对象。

  • exc

    发生的错误(如果有)。


应用信号

import_modules

当程序(worker、beat、shell)等要求导入 :setting:`include`:setting:`imports` 设置中的模块时发送此信号.

发件人是应用程序实例。


工人信号

celeryd_after_setup

这个信号是在 worker 实例设置之后,但在它调用 run 之前发送的。 这意味着来自 celery worker -Q 选项的任何队列都已启用,日志记录已设置等等。

它可用于添加应始终使用的自定义队列,而不管 celery worker -Q 选项。 这是为每个工作人员设置直接队列的示例,然后这些队列可用于将任务路由到任何特定工作人员:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    instance.app.amqp.queues.select_add(queue_name)

提供参数:

  • sender

    工人的节点名称。

  • instance

    这是要初始化的 celery.apps.worker.Worker 实例。 请注意,到目前为止仅设置了 apphostname(节点名)属性,其余的 __init__ 尚未执行。

  • conf

    当前应用程序的配置。


celeryd_init

这是 celery worker 启动时发送的第一个信号。 sender 是 worker 的主机名,因此该信号可用于设置 worker 特定配置:

from celery.signals import celeryd_init

@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
    conf.task_default_rate_limit = '10/m'

或者要为多个工作人员设置配置,您可以在连接时省略指定发件人:

from celery.signals import celeryd_init

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    if sender in ('worker1@example.com', 'worker2@example.com'):
        conf.task_default_rate_limit = '10/m'
    if sender == 'worker3@example.com':
        conf.worker_prefetch_multiplier = 0

提供参数:

  • sender

    工人的节点名。

  • instance

    这是要初始化的 celery.apps.worker.Worker 实例。 请注意,到目前为止仅设置了 apphostname(节点名)属性,其余的 __init__ 尚未执行。

  • conf

    当前应用程序的配置。

  • options

    从命令行参数(包括默认值)传递给 worker 的选项。


worker_init

在工作器启动之前调度。


worker_ready

当工人准备好接受工作时调度。


heartbeat_sent

当 Celery 发送工作人员心跳时调度。

发件人是 celery.worker.heartbeat.Heart 实例。


worker_shutting_down

在工作程序开始关闭过程时调度。

提供参数:

  • sig

    接收到的 POSIX 信号。

  • how

    关机方法,暖或冷。

  • exitcode

    主进程退出时将使用的退出代码。


worker_process_init

在所有池子进程启动时分派。

请注意,附加到此信号的处理程序不得阻塞超过 4 秒,否则该进程将在假设启动失败时被终止。


worker_process_shutdown

在所有池子进程退出之前分派。

注意:不能保证这个信号会被分派,类似于 finally 块,不能保证在关闭时调用处理程序,如果调用它可能会被中断。

提供参数:

  • pid

    即将关闭的子进程的 pid。

  • exitcode

    子进程退出时将使用的退出代码。


worker_shutdown

在工作器即将关闭时调度。


节拍信号

beat_init

celery beat 启动(独立或嵌入)时调度。

发件人是 celery.beat.Service 实例。


beat_embedded_init

celery beat 作为嵌入式进程启动时,除了 :signal:`beat_init` 信号外,还会分派。

发件人是 celery.beat.Service 实例。


Eventlet 信号

eventlet_pool_started

当 eventlet 池启动时发送。

发件人是 celery.concurrency.eventlet.TaskPool 实例。


eventlet_pool_preshutdown

在 worker 关闭时发送,就在 eventlet 池被请求等待剩余的 worker 之前发送。

发件人是 celery.concurrency.eventlet.TaskPool 实例。


eventlet_pool_postshutdown

当池已加入且工作程序准备关闭时发送。

发件人是 celery.concurrency.eventlet.TaskPool 实例。


eventlet_pool_apply

每当任务应用于池时发送。

发件人是 celery.concurrency.eventlet.TaskPool 实例。

提供参数:

  • target

    目标函数。

  • args

    位置参数。

  • kwargs

    关键字参数。


记录信号

setup_logging

如果此信号已连接,Celery 将不会配置记录器,因此您可以使用它来完全覆盖您自己的日志记录配置。

如果你想增加 Celery 的日志配置设置,那么你可以使用 :signal:`after_setup_logger`:signal:`after_setup_task_logger` 信号。

提供参数:

  • loglevel

    日志对象的级别。

  • logfile

    日志文件的名称。

  • format

    日志格式字符串。

  • colorize

    指定日志消息是否带有颜色。


after_setup_logger

在每个全局记录器(不是任务记录器)设置后发送。 用于扩充日志记录配置。

提供参数:

  • logger

    记录器对象。

  • loglevel

    日志对象的级别。

  • logfile

    日志文件的名称。

  • format

    日志格式字符串。

  • colorize

    指定日志消息是否带有颜色。


after_setup_task_logger

在设置每个任务记录器后发送。 用于扩充日志记录配置。

提供参数:

  • logger

    记录器对象。

  • loglevel

    日志对象的级别。

  • logfile

    日志文件的名称。

  • format

    日志格式字符串。

  • colorize

    指定日志消息是否带有颜色。


命令信号

user_preload_options

在任何 Celery 命令行程序完成解析用户预加载选项后发送此信号。

它可用于向 celery 伞形命令添加额外的命令行参数:

from celery import Celery
from celery import signals
from celery.bin.base import Option

app = Celery()
app.user_options['preload'].add(Option(
    '--monitoring', action='store_true',
    help='Enable our external monitoring utility, blahblah',
))

@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
    if options['monitoring']:
        enable_monitoring()

发送者是 Command 实例,其值取决于被调用的程序(例如,对于伞命令,它将是一个 CeleryCommand)对象)。

提供参数:

  • app

    应用实例。

  • options

    解析的用户预加载选项的映射(具有默认值)。


弃用的信号

task_sent

此信号已弃用,请改用 :signal:`after_task_publish`