监控和管理指南 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/monitoring
跳转至:导航、​搜索

监控管理指南

简介

有多种工具可用于监视和检查 Celery 集群。

本文档描述了其中的一些,以及与监控相关的功能,如事件和广播命令。


工人

管理命令行实用程序 (inspect/control)

celery 也可用于检查和管理工作节点(以及某种程度的任务)。

要列出所有可用的命令,请执行以下操作:

$ celery --help

或获取特定命令的帮助,请执行以下操作:

$ celery <command> --help

命令

  • shell:放入 Python shell。

    本地人将包括 celery 变量:这是当前的应用程序。 此外,所有已知任务都将自动添加到本地(除非设置了 --without-tasks 标志)。

    如果已安装,则按 :pypi:`Ipython`:pypi:`bpython` 或常规 python 使用。 您可以使用 --ipython--bpython--python 强制实现。

  • status:列出该集群中的活动节点

    $ celery -A proj status
  • result:显示一个任务的结果

    $ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577

    请注意,只要任务不使用自定义结果后端,您就可以省略任务的名称。

  • purge:从所有配置的任务队列中清除消息。

    此命令将从 :setting:`CELERY_QUEUES` 设置中配置的队列中删除所有消息:

    警告

    此操作无法撤消,消息将被永久删除!

    $ celery -A proj purge

    您还可以使用 -Q 选项指定要清除的队列:

    $ celery -A proj purge -Q celery,foo,bar

    并使用 -X 选项排除队列被清除:

    $ celery -A proj purge -X celery
  • inspect active:列出活动任务

    $ celery -A proj inspect active

    这些都是当前正在执行的任务。

  • inspectcheduled:列出预定的ETA任务

    $ celery -A proj inspect scheduled

    这些是工作人员在拥有 eta[X64X] 或 countdown 参数集时保留的任务。

  • inspect reserved:列出保留任务

    $ celery -A proj inspect reserved

    这将列出所有已被工作程序预取的任务,并且当前正在等待执行(不包括具有 ETA 值集的任务)。

  • inspect revoked:列出已撤销任务的历史记录

    $ celery -A proj inspect revoked
  • 检查已注册:列出已注册的任务

    $ celery -A proj inspect registered
  • inspect stats:显示工人统计信息(见 Statistics

    $ celery -A proj inspect stats
  • inspect query_task:通过id显示任务信息。

    任何在这组 ids reserved/active 中拥有任务的 worker 都会以状态和信息进行响应。

    $ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8

    您还可以查询有关多个任务的信息:

    $ celery -A proj inspect query_task id1 id2 ... idN
  • control enable_events:启用事件

    $ celery -A proj control enable_events
  • control disable_events:禁用事件

    $ celery -A proj control disable_events
  • migrate:将任务从一个代理迁移到另一个代理(EXPERIMENTAL)。

    $ celery -A proj migrate redis://localhost amqp://localhost

    此命令会将一个代理上的所有任务迁移到另一个代理。 由于这个命令是新的和实验性的,你应该确保在继续之前备份数据。

笔记

所有 inspectcontrol 命令都支持 --timeout 参数,这是等待响应的秒数。 如果由于延迟而未收到响应,则可能需要增加此超时时间。


指定目标节点

默认情况下,检查和控制命令对所有工作人员起作用。 您可以使用 --destination 参数指定单个或一组工作人员:

$ celery -A proj inspect -d w1@e.com,w2@e.com reserved

$ celery -A proj control -d w1@e.com,w2@e.com enable_events

Flower:实时 Celery 网络监视器

Flower 是一个基于 Web 的实时监控和 Celery 管理工具。 它正在积极开发中,但已经是必不可少的工具。 作为 Celery 的推荐监视器,它淘汰了 Django-Admin 监视器、celerymon 和基于 ncurses 的监视器。

Flower 的发音类似于“flow”,但如果您愿意,也可以使用植物版本。

特点

  • 使用 Celery Events 进行实时监控

    • 任务进度和历史

    • 能够显示任务详细信息(参数、开始时间、运行时间等)

    • 图表和统计


  • 遥控器

    • 查看工作人员状态和统计信息

    • 关闭并重新启动工作器实例

    • 控制工作池大小和自动缩放设置

    • 查看和修改工作实例使用的队列

    • 查看当前运行的任务

    • 查看计划任务(ETA/倒计时)

    • 查看保留和撤销的任务

    • 应用时间和速率限制

    • 配置查看器

    • 撤销或终止任务


  • HTTP API

    • 列出工人

    • 关闭一个工人

    • 重启工作池

    • 种植工人池

    • 收缩工人池

    • 自动缩放工作器池

    • 从队列开始消费

    • 停止从队列中消费

    • 列出任务

    • 列出(看到的)任务类型

    • 获取任务信息

    • 执行任务

    • 按名称执行任务

    • 获取任务结果

    • 更改任务的软硬时间限制

    • 更改任务的速率限制

    • 撤销任务


  • OpenID 认证

截图

thumb|none

thumb|none

更多截图


用法

您可以使用 pip 安装 Flower:

$ pip install flower

运行花命令将启动您可以访问的网络服务器:

$ celery -A proj flower

默认端口为 http://localhost:5555,但您可以使用 –port 参数更改此端口:

$ celery -A proj flower --port=5555

Broker URL 也可以通过 --broker 参数传递:

$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0

然后,您可以在您的网络浏览器中访问花:

$ open http://localhost:5555

Flower 具有比此处详述的更多功能,包括授权选项。 查看 官方文档 了解更多信息。


celery 事件:诅咒监视器

2.0 版中的新功能。


celery events 是一个简单的curses 监视器,显示任务和工人历史。 您可以检查任务的结果和回溯,它还支持一些管理命令,例如限速和关闭 worker。 此监视器是作为概念证明而启动的,您可能想改用 Flower。

开始:

$ celery -A proj events

您应该会看到如下所示的屏幕:

thumb|none

celery events 也用于启动快照相机(参见 Snapshots

$ celery -A proj events --camera=<camera-class> --frequency=1.0

它包括一个将事件转储到 stdout 的工具:

$ celery -A proj events --dump

要获得完整的选项列表,请使用 --help

$ celery events --help

兔MQ

要管理 Celery 集群,了解如何监控 RabbitMQ 非常重要。

RabbitMQ 附带了 rabbitmqctl(1) 命令,您可以通过这个命令列出队列、交换器、绑定、队列长度、每个队列的内存使用情况,以及管理用户、虚拟主机及其权限。

笔记

这些示例中使用默认虚拟主机 ("/"),如果您使用自定义虚拟主机,则必须在命令中添加 -p 参数,例如:rabbitmqctl list_queues -p my_vhost …


检查队列

查找队列中的任务数:

$ rabbitmqctl list_queues name messages messages_ready \
                          messages_unacknowledged

这里 messages_ready 是准备发送的消息数(已发送但未收到),messages_unacknowledged 是工作人员已收到但尚未确认的消息数(意味着它在进度,或已保留)。 messages 是就绪消息和未确认消息的总和。

从队列中查找当前消耗的工作线程数:

$ rabbitmqctl list_queues name consumers

查找分配给队列的内存量:

$ rabbitmqctl list_queues name memory
提示
-q 选项添加到 rabbitmqctl(1) 使输出更易于解析。


Redis

如果您使用 Redis 作为代理,则可以使用 redis-cli(1) 命令来监控 Celery 集群以列出队列的长度。

检查队列

查找队列中的任务数:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

默认队列名为 celery。 要获取所有可用队列,请调用:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*

笔记

队列键仅在其中有任务时才存在,因此如果键不存在,则仅意味着该队列中没有消息。 这是因为在 Redis 中,一个没有元素的列表会被自动删除,因此它不会出现在 keys 命令输出中,并且该列表的 llen 返回 0。

此外,如果您将 Redis 用于其他目的,keys 命令的输出将包括存储在数据库中的无关值。 推荐的解决方法是为 Celery 使用专用的 DATABASE_NUMBER,您也可以使用数据库编号将 Celery 应用程序彼此(虚拟主机)分开,但这不会影响 for 使用的监控事件例如 Flower 作为 Redis 发布/订阅命令是全局的,而不是基于数据库的。


穆宁

这是一个已知的 Munin 插件列表,它们在维护 Celery 集群时很有用。


活动

每当发生某些事件时,工作人员都有能力发送消息。 然后这些事件被诸如 Flower 和 celery events 之类的工具捕获以监控集群。

快照

2.1 版中的新功能。


即使是单个工作人员也可以产生大量事件,因此将所有事件的历史记录存储在磁盘上可能非常昂贵。

一系列事件描述了该时间段内的集群状态,通过定期拍摄该状态的快照,您可以保留所有历史记录,但仍只能定期将其写入磁盘。

要拍摄快照,您需要一个 Camera 类,通过它您可以定义每次捕获状态时应该发生的事情; 您可以将其写入数据库,通过电子邮件或其他方式发送。

celery events 然后用于使用相机拍摄快照,例如,如果您想使用相机 myapp.Camera 每 2 秒捕获一次状态,则使用 celery events 运行以下参数:

$ celery -A proj events -c myapp.Camera --frequency=2.0

定制相机

如果您需要捕获事件并每隔一段时间对这些事件进行处理,则相机会很有用。 对于实时事件处理,您应该直接使用 @events.Receiver,就像在 Real-time processing 中一样。

这是一个示例相机,将快照转储到屏幕:

from pprint import pformat

from celery.events.snapshot import Polaroid

class DumpCam(Polaroid):
    clear_after = True  # clear after flush (incl, state.event_count).

    def on_shutter(self, state):
        if not state.event_count:
            # No new events since last snapshot.
            return
        print('Workers: {0}'.format(pformat(state.workers, indent=4)))
        print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
        print('Total: {0.event_count} events, {0.task_count} tasks'.format(
            state))

请参阅 celery.events.state 的 API 参考以了解有关状态对象的更多信息。

现在,您可以通过使用 -c 选项指定该凸轮与 celery events

$ celery -A proj events -c myapp.DumpCam --frequency=2.0

或者您可以像这样以编程方式使用它:

from celery import Celery
from myapp import DumpCam

def main(app, freq=1.0):
    state = app.events.State()
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': state.event})
        with DumpCam(state, freq=freq):
            recv.capture(limit=None, timeout=None)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    main(app)

实时处理

要实时处理事件,您需要以下内容

  • 一个事件消费者(这是 Receiver

  • 事件进入时调用的一组处理程序。

    您可以为每种事件类型使用不同的处理程序,也可以使用全能处理程序 ('*')

  • 状态(可选)

    @events.State 是集群中任务和工作线程的一种方便的内存表示,随着事件的发生而更新。

    它封装了许多常见事情的解决方案,例如检查工作人员是否还活着(通过验证心跳),在事件到来时将事件字段合并在一起,确保时间戳同步,等等。

结合这些,您可以轻松地实时处理事件:

from celery import Celery


def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

笔记

capturewakeup 参数向所有工作人员发送信号以强制他们发送心跳。 这样,您可以在监视器启动时立即看到工作人员。


您可以通过指定处理程序来侦听特定事件:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

事件参考

此列表包含工作人员发送的事件及其参数。

任务事件

任务发送

签名
task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)

在发布任务消息并且启用 :setting:`task_send_sent_event` 设置时发送。


接到任务

签名
task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)

当工作人员收到任务时发送。


任务启动

签名
task-started(uuid, hostname, timestamp, pid)

在 worker 执行任务之前发送。


任务成功

签名
task-succeeded(uuid, result, runtime, hostname, timestamp)

如果任务成功执行,则发送。

运行时间是使用池执行任务所花费的时间。 (从任务开始发送到工作池,并在调用池结果处理程序回调时结束)。


任务失败

签名
task-failed(uuid, exception, traceback, hostname, timestamp)

任务执行失败时发送。


任务被拒绝

签名
task-rejected(uuid, requeued)

该任务被工作人员拒绝,可能会重新排队或移至死信队列。


任务撤销

签名
task-revoked(uuid, terminated, signum, expired)

如果任务已被撤销,则发送(请注意,这很可能由多个工作人员发送)。

  • *; 如果任务进程终止,terminated 设置为真,
    signum 字段设置为使用的信号。
  • 如果任务过期,expired 设置为真。


重试任务

签名
task-retried(uuid, exception, traceback, hostname, timestamp)

如果任务失败则发送,但将来会重试。


工人事件

工人在线

签名
worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

工作人员已连接到代理并在线。

  • hostname:worker 的节点名。
  • timestamp:事件时间戳。
  • freq:以秒为单位的心跳频率(浮动)。
  • sw_ident:工作软件的名称(例如,py-celery)。
  • sw_ver:软件版本(例如 2.2.0)。
  • sw_sys:操作系统(例如,Linux/Darwin)。


工人心跳

签名
worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)

每分钟发送一次,如果worker在2分钟内没有发送心跳,则认为是离线的。

  • hostname:worker 的节点名。
  • timestamp:事件时间戳。
  • freq:以秒为单位的心跳频率(浮动)。
  • sw_ident:工作软件的名称(例如,py-celery)。
  • sw_ver:软件版本(例如 2.2.0)。
  • sw_sys:操作系统(例如,Linux/Darwin)。
  • active:当前正在执行的任务数。
  • processed:这个worker处理的任务总数。


工人离线

签名
worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)

工作人员已与代理断开连接。