监控和管理指南 — Python 文档
监控管理指南
简介
有多种工具可用于监视和检查 Celery 集群。
本文档描述了其中的一些,以及与监控相关的功能,如事件和广播命令。
工人
管理命令行实用程序 (inspect/control)
celery 也可用于检查和管理工作节点(以及某种程度的任务)。
要列出所有可用的命令,请执行以下操作:
$ celery --help
或获取特定命令的帮助,请执行以下操作:
$ celery <command> --help
命令
shell:放入 Python shell。
本地人将包括
celery
变量:这是当前的应用程序。 此外,所有已知任务都将自动添加到本地(除非设置了--without-tasks
标志)。如果已安装,则按 :pypi:`Ipython`、:pypi:`bpython` 或常规 python 使用。 您可以使用
--ipython
、--bpython
或--python
强制实现。status:列出该集群中的活动节点
$ celery -A proj status
result:显示一个任务的结果
$ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
请注意,只要任务不使用自定义结果后端,您就可以省略任务的名称。
purge:从所有配置的任务队列中清除消息。
此命令将从 :setting:`CELERY_QUEUES` 设置中配置的队列中删除所有消息:
警告
此操作无法撤消,消息将被永久删除!
$ celery -A proj purge
您还可以使用 -Q 选项指定要清除的队列:
$ celery -A proj purge -Q celery,foo,bar
并使用 -X 选项排除队列被清除:
$ celery -A proj purge -X celery
inspect active:列出活动任务
$ celery -A proj inspect active
这些都是当前正在执行的任务。
inspectcheduled:列出预定的ETA任务
$ celery -A proj inspect scheduled
这些是工作人员在拥有 eta[X64X] 或 countdown 参数集时保留的任务。
inspect reserved:列出保留任务
$ celery -A proj inspect reserved
这将列出所有已被工作程序预取的任务,并且当前正在等待执行(不包括具有 ETA 值集的任务)。
inspect revoked:列出已撤销任务的历史记录
$ celery -A proj inspect revoked
检查已注册:列出已注册的任务
$ celery -A proj inspect registered
inspect stats:显示工人统计信息(见 Statistics)
$ celery -A proj inspect stats
inspect query_task:通过id显示任务信息。
任何在这组 ids reserved/active 中拥有任务的 worker 都会以状态和信息进行响应。
$ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
您还可以查询有关多个任务的信息:
$ celery -A proj inspect query_task id1 id2 ... idN
control enable_events:启用事件
$ celery -A proj control enable_events
control disable_events:禁用事件
$ celery -A proj control disable_events
migrate:将任务从一个代理迁移到另一个代理(EXPERIMENTAL)。
$ celery -A proj migrate redis://localhost amqp://localhost
此命令会将一个代理上的所有任务迁移到另一个代理。 由于这个命令是新的和实验性的,你应该确保在继续之前备份数据。
笔记
所有 inspect
和 control
命令都支持 --timeout
参数,这是等待响应的秒数。 如果由于延迟而未收到响应,则可能需要增加此超时时间。
指定目标节点
默认情况下,检查和控制命令对所有工作人员起作用。 您可以使用 --destination
参数指定单个或一组工作人员:
$ celery -A proj inspect -d w1@e.com,w2@e.com reserved
$ celery -A proj control -d w1@e.com,w2@e.com enable_events
Flower:实时 Celery 网络监视器
Flower 是一个基于 Web 的实时监控和 Celery 管理工具。 它正在积极开发中,但已经是必不可少的工具。 作为 Celery 的推荐监视器,它淘汰了 Django-Admin 监视器、celerymon
和基于 ncurses
的监视器。
Flower 的发音类似于“flow”,但如果您愿意,也可以使用植物版本。
特点
使用 Celery Events 进行实时监控
任务进度和历史
能够显示任务详细信息(参数、开始时间、运行时间等)
图表和统计
遥控器
查看工作人员状态和统计信息
关闭并重新启动工作器实例
控制工作池大小和自动缩放设置
查看和修改工作实例使用的队列
查看当前运行的任务
查看计划任务(ETA/倒计时)
查看保留和撤销的任务
应用时间和速率限制
配置查看器
撤销或终止任务
HTTP API
列出工人
关闭一个工人
重启工作池
种植工人池
收缩工人池
自动缩放工作器池
从队列开始消费
停止从队列中消费
列出任务
列出(看到的)任务类型
获取任务信息
执行任务
按名称执行任务
获取任务结果
更改任务的软硬时间限制
更改任务的速率限制
撤销任务
OpenID 认证
截图
更多截图:
用法
您可以使用 pip 安装 Flower:
$ pip install flower
运行花命令将启动您可以访问的网络服务器:
$ celery -A proj flower
默认端口为 http://localhost:5555,但您可以使用 –port 参数更改此端口:
$ celery -A proj flower --port=5555
Broker URL 也可以通过 --broker
参数传递:
$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0
然后,您可以在您的网络浏览器中访问花:
$ open http://localhost:5555
Flower 具有比此处详述的更多功能,包括授权选项。 查看 官方文档 了解更多信息。
celery 事件:诅咒监视器
2.0 版中的新功能。
celery events 是一个简单的curses 监视器,显示任务和工人历史。 您可以检查任务的结果和回溯,它还支持一些管理命令,例如限速和关闭 worker。 此监视器是作为概念证明而启动的,您可能想改用 Flower。
开始:
$ celery -A proj events
您应该会看到如下所示的屏幕:
celery events 也用于启动快照相机(参见 Snapshots:
$ celery -A proj events --camera=<camera-class> --frequency=1.0
它包括一个将事件转储到 stdout
的工具:
$ celery -A proj events --dump
要获得完整的选项列表,请使用 --help
:
$ celery events --help
兔MQ
要管理 Celery 集群,了解如何监控 RabbitMQ 非常重要。
RabbitMQ 附带了 rabbitmqctl(1) 命令,您可以通过这个命令列出队列、交换器、绑定、队列长度、每个队列的内存使用情况,以及管理用户、虚拟主机及其权限。
笔记
这些示例中使用默认虚拟主机 ("/"
),如果您使用自定义虚拟主机,则必须在命令中添加 -p
参数,例如:rabbitmqctl list_queues -p my_vhost …
检查队列
查找队列中的任务数:
$ rabbitmqctl list_queues name messages messages_ready \
messages_unacknowledged
这里 messages_ready 是准备发送的消息数(已发送但未收到),messages_unacknowledged 是工作人员已收到但尚未确认的消息数(意味着它在进度,或已保留)。 messages 是就绪消息和未确认消息的总和。
从队列中查找当前消耗的工作线程数:
$ rabbitmqctl list_queues name consumers
查找分配给队列的内存量:
$ rabbitmqctl list_queues name memory
- 提示
- 将
-q
选项添加到 rabbitmqctl(1) 使输出更易于解析。
Redis
如果您使用 Redis 作为代理,则可以使用 redis-cli(1) 命令来监控 Celery 集群以列出队列的长度。
检查队列
查找队列中的任务数:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
默认队列名为 celery。 要获取所有可用队列,请调用:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*
笔记
队列键仅在其中有任务时才存在,因此如果键不存在,则仅意味着该队列中没有消息。 这是因为在 Redis 中,一个没有元素的列表会被自动删除,因此它不会出现在 keys 命令输出中,并且该列表的 llen 返回 0。
此外,如果您将 Redis 用于其他目的,keys 命令的输出将包括存储在数据库中的无关值。 推荐的解决方法是为 Celery 使用专用的 DATABASE_NUMBER,您也可以使用数据库编号将 Celery 应用程序彼此(虚拟主机)分开,但这不会影响 for 使用的监控事件例如 Flower 作为 Redis 发布/订阅命令是全局的,而不是基于数据库的。
穆宁
这是一个已知的 Munin 插件列表,它们在维护 Celery 集群时很有用。
rabbitmq-munin
:RabbitMQ 的 Munin 插件。celery_tasks
:监控每个任务类型已经执行的次数(需要celerymon)。celery_tasks_states
:监控每个状态的任务数(需要celerymon)。
活动
每当发生某些事件时,工作人员都有能力发送消息。 然后这些事件被诸如 Flower 和 celery events 之类的工具捕获以监控集群。
快照
2.1 版中的新功能。
即使是单个工作人员也可以产生大量事件,因此将所有事件的历史记录存储在磁盘上可能非常昂贵。
一系列事件描述了该时间段内的集群状态,通过定期拍摄该状态的快照,您可以保留所有历史记录,但仍只能定期将其写入磁盘。
要拍摄快照,您需要一个 Camera 类,通过它您可以定义每次捕获状态时应该发生的事情; 您可以将其写入数据库,通过电子邮件或其他方式发送。
celery events 然后用于使用相机拍摄快照,例如,如果您想使用相机 myapp.Camera
每 2 秒捕获一次状态,则使用 celery events 运行以下参数:
$ celery -A proj events -c myapp.Camera --frequency=2.0
定制相机
如果您需要捕获事件并每隔一段时间对这些事件进行处理,则相机会很有用。 对于实时事件处理,您应该直接使用 @events.Receiver
,就像在 Real-time processing 中一样。
这是一个示例相机,将快照转储到屏幕:
from pprint import pformat
from celery.events.snapshot import Polaroid
class DumpCam(Polaroid):
clear_after = True # clear after flush (incl, state.event_count).
def on_shutter(self, state):
if not state.event_count:
# No new events since last snapshot.
return
print('Workers: {0}'.format(pformat(state.workers, indent=4)))
print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
print('Total: {0.event_count} events, {0.task_count} tasks'.format(
state))
请参阅 celery.events.state
的 API 参考以了解有关状态对象的更多信息。
现在,您可以通过使用 -c
选项指定该凸轮与 celery events:
$ celery -A proj events -c myapp.DumpCam --frequency=2.0
或者您可以像这样以编程方式使用它:
from celery import Celery
from myapp import DumpCam
def main(app, freq=1.0):
state = app.events.State()
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={'*': state.event})
with DumpCam(state, freq=freq):
recv.capture(limit=None, timeout=None)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
main(app)
实时处理
要实时处理事件,您需要以下内容
一个事件消费者(这是
Receiver
)事件进入时调用的一组处理程序。
您可以为每种事件类型使用不同的处理程序,也可以使用全能处理程序 ('*')
状态(可选)
@events.State
是集群中任务和工作线程的一种方便的内存表示,随着事件的发生而更新。它封装了许多常见事情的解决方案,例如检查工作人员是否还活着(通过验证心跳),在事件到来时将事件字段合并在一起,确保时间戳同步,等等。
结合这些,您可以轻松地实时处理事件:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
笔记
capture
的 wakeup
参数向所有工作人员发送信号以强制他们发送心跳。 这样,您可以在监视器启动时立即看到工作人员。
您可以通过指定处理程序来侦听特定事件:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
事件参考
此列表包含工作人员发送的事件及其参数。
任务事件
任务发送
- 签名
task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)
在发布任务消息并且启用 :setting:`task_send_sent_event` 设置时发送。
接到任务
- 签名
task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)
当工作人员收到任务时发送。
任务启动
- 签名
task-started(uuid, hostname, timestamp, pid)
在 worker 执行任务之前发送。
任务成功
- 签名
task-succeeded(uuid, result, runtime, hostname, timestamp)
如果任务成功执行,则发送。
运行时间是使用池执行任务所花费的时间。 (从任务开始发送到工作池,并在调用池结果处理程序回调时结束)。
任务失败
- 签名
task-failed(uuid, exception, traceback, hostname, timestamp)
任务执行失败时发送。
任务被拒绝
- 签名
task-rejected(uuid, requeued)
该任务被工作人员拒绝,可能会重新排队或移至死信队列。
任务撤销
- 签名
task-revoked(uuid, terminated, signum, expired)
如果任务已被撤销,则发送(请注意,这很可能由多个工作人员发送)。
- *; 如果任务进程终止,
terminated
设置为真,signum
字段设置为使用的信号。
- 如果任务过期,
expired
设置为真。
重试任务
- 签名
task-retried(uuid, exception, traceback, hostname, timestamp)
如果任务失败则发送,但将来会重试。
工人事件
工人在线
- 签名
worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
工作人员已连接到代理并在线。
- hostname:worker 的节点名。
- timestamp:事件时间戳。
- freq:以秒为单位的心跳频率(浮动)。
- sw_ident:工作软件的名称(例如,
py-celery
)。 - sw_ver:软件版本(例如 2.2.0)。
- sw_sys:操作系统(例如,Linux/Darwin)。
工人心跳
- 签名
worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)
每分钟发送一次,如果worker在2分钟内没有发送心跳,则认为是离线的。
- hostname:worker 的节点名。
- timestamp:事件时间戳。
- freq:以秒为单位的心跳频率(浮动)。
- sw_ident:工作软件的名称(例如,
py-celery
)。 - sw_ver:软件版本(例如 2.2.0)。
- sw_sys:操作系统(例如,Linux/Darwin)。
- active:当前正在执行的任务数。
- processed:这个worker处理的任务总数。
工人离线
- 签名
worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
工作人员已与代理断开连接。