工人指南 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/workers
跳转至:导航、​搜索

工人指南

启动工人

守护进程

您可能希望使用守护程序工具在后台启动工作程序。 请参阅 Daemonization 以获取使用流行的服务管理器将 worker 作为守护进程启动的帮助。

您可以通过执行以下命令在前台启动 worker:

$ celery -A proj worker -l INFO

有关可用命令行选项的完整列表,请参阅 worker,或者只需执行以下操作:

$ celery worker --help

您可以在同一台机器上启动多个 worker,但一定要通过使用 --hostname 参数指定节点名称来命名每个单独的 worker:

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

hostname 参数可以扩展以下变量:

  • %h:主机名,包括域名。
  • %n:仅主机名。
  • %d:仅限域名。


如果当前主机名是 george.example.com,它们将扩展为:

变量 模板 结果
%h worker1@%h worker1@george.example.com
%n worker1@%n worker1@george
%d worker1@%d worker1@example.com

:pypi:`supervisor` 用户注意事项

% 符号必须通过添加第二个符号来转义:%%h。


停止工人

应该使用 :sig:`TERM` 信号完成关闭。

启动关闭时,worker 将在实际终止之前完成所有当前正在执行的任务。 如果这些任务很重要,你应该等待它完成,然后再做任何激烈的事情,比如发送 :sig:`KILL` 信号。

如果工作人员在经过一段时间后仍未关闭,因为陷入无限循环或类似情况,您可以使用 :sig:`KILL` 信号强制终止工作人员:但请注意,当前正在执行的任务将丢失(即,除非任务设置了 acks_late 选项)。

此外,由于进程无法覆盖 :sig:`KILL` 信号,worker 将无法获取其子进程; 确保手动执行此操作。 这个命令通常可以解决问题:

$ pkill -9 -f 'celery worker'

如果您的系统上没有 pkill 命令,您可以使用稍长的版本:

$ ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9

5.2 版更改: 在 Linux 系统上,Celery 现在支持在 worker 终止后向所有子进程发送 :sig:`KILL` 信号。 这是通过 prctl(2) 的 PR_SET_PDEATHSIG 选项完成的。


重新启动工人

要重新启动工作程序,您应该发送 TERM 信号并启动一个新实例。 管理开发人员的最简单方法是使用 celery multi:

$ celery multi start 1 -A proj -l INFO -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid

对于生产部署,您应该使用 init-scripts 或进程监督系统(参见 Daemonization)。

除了停止,然后启动worker重新启动之外,您还可以使用:sig:`HUP`信号重新启动worker。 请注意,worker 将负责自行重启,因此这很容易出现问题,不建议在生产中使用:

$ kill -HUP $pid

笔记

通过 :sig:`HUP` 重新启动仅在 worker 作为守护进程在后台运行时才有效(它没有控制终端)。

:sig:`HUP` 在 macOS 上被禁用,因为该平台的限制。


过程信号

工作进程的主进程会覆盖以下信号:

:sig:`术语` 热关机,等待任务完成。
:sig:`退出` 冷关机,尽快终止
:sig:`USR1` 转储所有活动线程的回溯。
:sig:`USR2` 远程调试,见celery.contrib.rdb


文件路径中的变量

--logfile--pidfile--statedb 的文件路径参数可以包含工作程序将扩展的变量:

节点名称替换

  • %p:完整的节点名称。
  • %h:主机名,包括域名。
  • %n:仅主机名。
  • %d:仅限域名。
  • %i:Prefork 池进程索引或 0 如果 MainProcess。
  • %I:带分隔符的 Prefork 池进程索引。

例如,如果当前主机名是 george@foo.example.com 那么这些将扩展为:

  • --logfile=%p.log -> george@foo.example.com.log
  • --logfile=%h.log -> foo.example.com.log
  • --logfile=%n.log -> george.log
  • --logfile=%d.log -> example.com.log


Prefork 池进程索引

prefork 池进程索引说明符将根据最终需要打开文件的进程扩展为不同的文件名。

这可用于为每个子进程指定一个日志文件。

请注意,即使进程退出或使用自动缩放/maxtasksperchild/时间限制,数字也将保持在进程限制内。 也就是说,该数字是 进程索引 而不是进程计数或 pid。

  • %i - 池进程索引或 0 如果 MainProcess。

    其中 -n worker1@example.com -c2 -f %n-%i.log 将产生三个日志文件:

    • worker1-0.log(主进程)

    • worker1-1.log(池进程1)

    • worker1-2.log(池进程2)


  • %I - 带分隔符的池进程索引。

    其中 -n worker1@example.com -c2 -f %n%I.log 将产生三个日志文件:

    • worker1.log(主进程)

    • worker1-1.log(池进程1)

    • worker1-2.log(池进程2)



并发

默认情况下,multiprocessing 用于执行任务的并发执行,但您也可以使用 Eventlet。 可以使用 --concurrency 参数更改工作进程/线程的数量,默认为机器上可用的 CPU 数量。

进程数(多处理/预分叉池)

更多的池进程通常更好,但有一个分界点,即添加更多的池进程会对性能产生负面影响。 甚至有一些证据支持运行多个 worker 实例可能比运行单个 worker 性能更好。 例如 3 个工人,每个工人有 10 个池进程。 您需要尝试找到最适合您的数字,因为这会因应用程序、工作负载、任务运行时间和其他因素而异。


遥控器

2.0 版中的新功能。


celery 命令

celery 程序用于从命令行执行远程控制命令。 它支持下面列出的所有命令。 有关详细信息,请参阅 管理命令行实用程序(检查/控制)

池支持
prefork、eventlet、gevent、thread、阻塞:solo(见注释)
经纪人支持
amqp,redis

工作人员可以使用高优先级广播消息队列进行远程控制。 这些命令可以针对所有工作人员或特定的工作人员列表。

命令也可以有回复。 然后客户端可以等待并收集这些回复。 由于没有中央机构知道集群中有多少可用的工作线程,也无法估计有多少工作线程可以发送回复,因此客户端有一个可配置的超时时间——回复到达的截止时间(以秒为单位)。 此超时默认为一秒。 如果worker没有在deadline内回复,也不一定是worker没有回复,或者更糟的是已经死了,可能只是网络延迟或者worker处理命令慢,所以相应地调整超时时间.

除了超时之外,客户端还可以指定要等待的最大回复数。 如果指定了目标,则此限制设置为目标主机的数量。

笔记

solo 池支持远程控制命令,但是任何任务执行都会阻塞任何等待的控制命令,因此如果 worker 非常忙,它的用途有限。 在这种情况下,您必须增加等待客户端回复的超时时间。


broadcast() 功能

这是用于向工作人员发送命令的客户端功能。 一些远程控制命令也有在后台使用broadcast()的更高级别的接口,例如rate_limit()ping()

发送 :control:`rate_limit` 命令和关键字参数:

>>> app.control.broadcast('rate_limit',
...                          arguments={'task_name': 'myapp.mytask',
...                                     'rate_limit': '200/m'})

这将异步发送命令,而无需等待回复。 要请求回复,您必须使用 reply 参数:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)
[{'worker1.example.com': 'New rate limit set successfully'},
 {'worker2.example.com': 'New rate limit set successfully'},
 {'worker3.example.com': 'New rate limit set successfully'}]

使用 destination 参数,您可以指定要接收命令的工作人员列表:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask',
...     'rate_limit': '200/m'}, reply=True,
...                             destination=['worker1@example.com'])
[{'worker1.example.com': 'New rate limit set successfully'}]

当然,使用更高级别的接口来设置速率限制要方便得多,但是有些命令只能使用 broadcast() 来请求。


命令

revoke:撤销任务

池支持
全部,终止仅由 prefork 和 eventlet 支持
经纪人支持
amqp,redis
命令
celery -A 项目控制撤销

所有工作节点都会保留已撤销任务 ID 的内存,无论是在内存中还是在磁盘上持久(请参阅 Persistent revokes)。

当工作人员收到撤销请求时,它将跳过执行任务,但不会终止已经执行的任务,除非设置了 terminate 选项。

笔记

当任务卡住时,终止选项是管理员的最后手段。 它不是用于终止任务,而是用于终止正在执行任务的进程,并且该进程可能在发送信号时已经开始处理另一个任务,因此出于这个原因,您绝不能以编程方式调用它。


如果设置了 terminate,则处理任务的工作子进程将被终止。 发送的默认信号是 TERM,但您可以使用 signal 参数指定它。 Signal 可以是 Python 标准库中 signal 模块中定义的任何信号的大写名称。

终止任务也会撤销它。

例子

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True)

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True, signal='SIGKILL')

撤销多个任务

3.1 版中的新功能。


revoke 方法还接受一个列表参数,它会同时撤销多个任务。

例子

>>> app.control.revoke([
...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
...    'f565793e-b041-4b2b-9ca4-dca22762a55d',
...    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

GroupResult.revoke 方法从 3.1 版开始利用这一点。


永久撤销

撤销任务的工作原理是向所有工作人员发送广播消息,然后工作人员在内存中保留一份已撤销任务的列表。 当一个 worker 启动时,它将与集群中的其他 worker 同步撤销的任务。

已撤销的任务列表在内存中,因此如果所有工作人员重新启动,已撤销的 id 列表也将消失。 如果要在重新启动之间保留此列表,则需要使用 –statedb 参数为 celery worker 指定要存储这些列表的文件:

$ celery -A proj worker -l INFO --statedb=/var/run/celery/worker.state

或者如果你使用 celery multi 你想为每个工作实例创建一个文件,所以使用 %n 格式来扩展当前节点名称:

celery multi start 2 -l INFO --statedb=/var/run/celery/%n.state

另见 文件路径中的变量

请注意,远程控制命令必须有效才能使撤销生效。 目前只有 RabbitMQ (amqp) 和 Redis 支持远程控制命令。


时间限制

2.0 版中的新功能。


池支持
prefork/gevent(见下面的注释)

软的,还是硬的?

时间限制设置为两个值,soft 和 hard。 软时间限制允许任务在被杀死之前捕获异常以进行清理:硬超时无法捕获并且强制终止任务。

单个任务可能会永远运行,如果您有很多任务等待某个永远不会发生的事件,您将无限期地阻止工作人员处理新任务。 防止这种情况发生的最佳方法是启用时间限制。

时间限制 (–time-limit) 是任务在执行它的进程终止并被新进程替换之前可以运行的最大秒数。 您还可以启用软时间限制(–soft-time-limit),这会引发一个异常,任务可以在硬时间限制杀死它之前捕获并清理它:

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

也可以使用 :setting:`task_time_limit` / :setting:`task_soft_time_limit` 设置来设置时间限制。

笔记

时间限制目前不适用于不支持 :sig:`SIGUSR1` 信号的平台。


笔记

gevent 池没有实现软时间限制。 此外,如果任务被阻塞,它不会强制执行硬时间限制。


在运行时更改时间限制

2.3 版中的新功能。


经纪人支持
amqp,redis

有一个远程控制命令可让您更改任务的软和硬时间限制 — 名为 time_limit

tasks.crawl_the_web 任务的时间限制更改为软时间限制为一分钟,硬时间限制为两分钟的示例:

>>> app.control.time_limit('tasks.crawl_the_web',
                           soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

只有在时间限制更改后开始执行的任务才会受到影响。


速率限制

在运行时更改速率限制

将 myapp.mytask 任务的速率限制更改为每分钟最多执行 200 个该类型任务的示例:

>>> app.control.rate_limit('myapp.mytask', '200/m')

上面没有指定目的地,因此更改请求将影响集群中的所有工作实例。 如果您只想影响特定的工作人员列表,您可以包含 destination 参数:

>>> app.control.rate_limit('myapp.mytask', '200/m',
...            destination=['celery@worker1.example.com'])

警告

这不会影响启用了 :setting:`worker_disable_rate_limits` 设置的工人。


每个子项设置的最大任务数

2.0 版中的新功能。


池支持
分叉前

使用此选项,您可以配置工作人员在被新进程替换之前可以执行的最大任务数。

如果您无法控制例如来自封闭源 C 扩展的内存泄漏,这将很有用。

可以使用 worker --max-tasks-per-child 参数或使用 :setting:`worker_max_tasks_per_child` 设置来设置该选项。


每个孩子设置的最大内存

4.0 版中的新功能。


池支持
分叉前

使用此选项,您可以配置工作人员在被新进程替换之前可以执行的最大驻留内存量。

如果您无法控制例如来自封闭源 C 扩展的内存泄漏,这将很有用。

可以使用 worker --max-memory-per-child 参数或使用 :setting:`worker_max_memory_per_child` 设置来设置该选项。


自动缩放

2.2 版中的新功能。


池支持
预叉gevent

autoscaler 组件用于根据负载动态调整池大小:

  • *; 当有工作要做时,自动缩放器会添加更多池进程,
    *;* 并在工作负载较低时开始删除进程。

它由 --autoscale 选项启用,它需要两个数字:池进程的最大和最小数量:

--autoscale=AUTOSCALE
     Enable autoscaling by providing
     max_concurrency,min_concurrency.  Example:
       --autoscale=10,3 (always keep 3 processes, but grow to
      10 if necessary).

您还可以通过子类化 Autoscaler 来为自动缩放器定义自己的规则。 指标的一些想法包括平均负载或可用内存量。 您可以使用 :setting:`worker_autoscaler` 设置指定自定义自动缩放器。


队列

一个工作实例可以从任意数量的队列中消费。 默认情况下,它将从 :setting:`task_queues` 设置中定义的所有队列中使用(如果未指定,则回退到名为 celery 的默认队列)。

您可以通过为 -Q 选项提供以逗号分隔的队列列表来指定启动时要使用的队列:

$ celery -A proj worker -l INFO -Q foo,bar,baz

如果队列名称在 :setting:`task_queues` 中定义,它将使用该配置,但如果它未在队列列表中定义,Celery 将自动为您生成一个新队列(取决于 [X215X ]:setting:`task_create_missing_queues` 选项)。

您还可以使用远程控制命令 :control:`add_consumer`:control:`cancel_consumer` 告诉工作线程在运行时开始和停止从队列中消费。

队列:添加消费者

:control:`add_consumer` 控制命令将告诉一个或多个 worker 从队列开始消费。 这个操作是幂等的。

要告诉集群中的所有工作线程从名为“foo”的队列开始消费,您可以使用 celery control 程序:

$ celery -A proj control add_consumer foo
-> worker1.local: OK
    started consuming from u'foo'

如果你想指定一个特定的工人,你可以使用 --destination 参数:

$ celery -A proj control add_consumer foo -d celery@worker1.local

同样可以使用 @control.add_consumer() 方法动态完成:

>>> app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

>>> app.control.add_consumer('foo', reply=True,
...                          destination=['worker1@example.com'])
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

到目前为止,我们只展示了使用自动队列的示例,如果您需要更多控制,您还可以指定 exchange、routing_key 甚至其他选项:

>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['w1@example.com', 'w2@example.com'])

队列:取消消费者

您可以使用 :control:`cancel_consumer` 控制命令按队列名称取消消费者。

要强制集群中的所有工作人员取消队列中的消费,您可以使用 celery control 程序:

$ celery -A proj control cancel_consumer foo

--destination 参数可用于指定工作人员或工作人员列表,以对命令执行操作:

$ celery -A proj control cancel_consumer foo -d celery@worker1.local

您还可以使用 @control.cancel_consumer() 方法以编程方式取消消费者:

>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]

队列:活动队列列表

您可以通过使用 :control:`active_queues` 控制命令来获取工作线程消耗的队列列表:

$ celery -A proj inspect active_queues
[...]

与所有其他远程控制命令一样,这也支持 --destination 参数,用于指定应回复请求的工作人员:

$ celery -A proj inspect active_queues -d celery@worker1.local
[...]

这也可以通过使用 active_queues() 方法以编程方式完成:

>>> app.control.inspect().active_queues()
[...]

>>> app.control.inspect(['worker1.local']).active_queues()
[...]

检查工人

@control.inspect 可以让你检查正在运行的工人。 它在引擎盖下使用远程控制命令。

您也可以使用 celery 命令来检查工人,它支持与 @control 接口相同的命令。

>>> # Inspect all nodes.
>>> i = app.control.inspect()

>>> # Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
                            'worker2.example.com'])

>>> # Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')

转储已注册的任务

您可以使用 registered() 获取在 worker 中注册的任务列表:

>>> i.registered()
[{'worker1.example.com': ['tasks.add',
                          'tasks.sleeptask']}]

转储当前正在执行的任务

您可以使用 active() 获取活动任务列表:

>>> i.active()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

计划 (ETA) 任务的转储

您可以使用 scheduled() 获取等待调度的任务列表:

>>> i.scheduled()
[{'worker1.example.com':
    [{'eta': '2010-06-07 09:07:52', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d',
        'args': '[1]',
        'kwargs': '{}'}},
     {'eta': '2010-06-07 09:07:53', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '49661b9a-aa22-4120-94b7-9ee8031d219d',
        'args': '[2]',
        'kwargs': '{}'}}]}]

笔记

这些是带有 ETA/倒计时参数的任务,而不是周期性任务。


保留任务的转储

保留任务是已收到但仍在等待执行的任务。

您可以使用 reserved() 获取这些列表:

>>> i.reserved()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

统计数据

远程控制命令 inspect stats(或 stats())会给你一长串有用的(或不太有用的)关于工人的统计信息:

$ celery -A proj inspect stats

输出详情请参考stats()的参考文档。


附加命令

远程关机

此命令将优雅地远程关闭 worker:

>>> app.control.broadcast('shutdown') # shutdown all workers
>>> app.control.broadcast('shutdown', destination='worker1@example.com')

此命令请求来自活着的工作人员的 ping。 工人用字符串 'pong' 回复,仅此而已。 除非您指定自定义超时,否则它将使用默认的一秒超时进行回复:

>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
 {'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

ping() 还支持 destination 参数,因此您可以指定要 ping 的工作人员:

>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

启用/禁用事件

您可以使用 enable_events、disable_events 命令启用/禁用事件。 这对于使用 celery events/celerymon 临时监视 worker 很有用。

>>> app.control.enable_events()
>>> app.control.disable_events()

编写自己的远程控制命令

有两种类型的远程控制命令:

  • 检查命令

    没有副作用,通常只会返回在 worker 中找到的一些值,例如当前注册的任务列表、活动任务列表等。

  • 控制命令

    执行副作用,比如添加一个新的队列来消费。

远程控制命令在控制面板中注册,它们采用单个参数:当前 ControlDispatch 实例。 如果需要,您可以从那里访问活动的 Consumer

这是一个增加任务预取计数的示例控制命令:

from celery.worker.control import control_command

@control_command(
    args=[('n', int)],
    signature='[N=1]',  # <- used for help on the command-line.
)
def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}

确保将此代码添加到工作人员导入的模块中:这可能与定义 Celery 应用程序的模块相同,或者您可以将模块添加到 :setting:`imports`环境。

重新启动 worker 以便注册控制命令,现在您可以使用 celery control 实用程序调用您的命令:

$ celery -A proj control increase_prefetch_count 3

您还可以向 celery inspect 程序添加操作,例如读取当前预取计数的操作:

from celery.worker.control import inspect_command

@inspect_command()
def current_prefetch_count(state):
    return {'prefetch_count': state.consumer.qos.value}

重新启动工作程序后,您现在可以使用 celery inspect 程序查询此值:

$ celery -A proj inspect current_prefetch_count