后续步骤 — Python 文档
下一步
使用 Celery 的第一步 指南是有意最小化的。 在本指南中,我将更详细地演示 Celery 提供的功能,包括如何为您的应用程序和库添加 Celery 支持。
本文档并未记录 Celery 的所有功能和最佳实践,因此建议您同时阅读 用户指南
在您的应用程序中使用 Celery
我们的项目
项目布局:
proj/__init__.py
/celery.py
/tasks.py
proj/celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='rpc://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
在本模块中,您创建了我们的 @Celery
实例(有时称为 app)。 要在您的项目中使用 Celery,您只需导入此实例。
broker
参数指定要使用的代理的 URL。有关更多信息,请参阅 选择经纪人 。
backend
参数指定要使用的结果后端。它用于跟踪任务状态和结果。 虽然默认情况下结果是禁用的,但我在这里使用 RPC 结果后端,因为我稍后演示了检索结果的工作方式。 您可能希望为您的应用程序使用不同的后端。 他们都有不同的优点和缺点。 如果您不需要结果,最好禁用它们。 还可以通过设置
@task(ignore_result=True)
选项来禁用单个任务的结果。有关详细信息,请参阅 保留结果 。
include
参数是工作程序启动时要导入的模块列表。 您需要在此处添加我们的任务模块,以便工作人员能够找到我们的任务。
proj/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
启动工人
celery程序可以用来启动worker(需要在proj上面的目录下运行worker):
$ celery -A proj worker -l INFO
当工作程序启动时,您应该看到一个横幅和一些消息:
--------------- celery@halcyon.local v4.0 (latentcall)
--- ***** -----
-- ******* ---- [Configuration]
- *** --- * --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events: OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
– broker 是您在我们的 celery
模块中的 broker 参数中指定的 URL。 您还可以使用 -b
选项在命令行上指定不同的代理。
– Concurrency 是用于并发处理您的任务的 prefork 工作进程的数量。 当所有这些都忙于工作时,新任务将不得不等待其中一项任务完成才能进行处理。
默认并发数是该机器上的 CPU 数(包括内核)。 您可以使用 celery worker -c
选项指定自定义编号。 没有推荐值,因为最佳数量取决于许多因素,但如果您的任务主要受 I/O 限制,那么您可以尝试增加它。 实验表明,增加两倍以上的 CPU 数量很少有效,反而可能会降低性能。
包括默认的 prefork 池,Celery 还支持使用 Eventlet、Gevent 和单线程运行(参见 Concurrency)。
– Events 是一个选项,它使 Celery 发送监控消息(事件)以监控工作线程中发生的操作。 这些可以由 celery events
和 Flower – 实时 Celery 监视器等监控程序使用,您可以在 监控和管理指南 中阅读。
– Queues 是工作人员将从中消费任务的队列列表。 可以告诉工作人员一次从多个队列中消费,这用于将消息路由到特定工作人员,作为服务质量、关注点分离和优先级排序的一种手段,所有这些都在 路由指南 中进行了描述.
您可以通过传入 --help
标志来获取命令行参数的完整列表:
$ celery worker --help
这些选项在 工人指南 中有更详细的描述。
在后台
在生产环境中,您需要在后台运行 worker,详细描述请参见 守护程序教程 。
守护进程脚本使用 celery multi 命令在后台启动一个或多个 worker:
$ celery multi start w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Starting nodes...
> w1.halcyon.local: OK
您也可以重新启动它:
$ celery multi restart w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
> w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64052
或停止它:
$ celery multi stop w1 -A proj -l INFO
stop
命令是异步的,所以它不会等待工作程序关闭。 您可能想改用 stopwait
命令,它确保所有当前正在执行的任务在退出之前都已完成:
$ celery multi stopwait w1 -A proj -l INFO
笔记
celery multi 不存储有关 worker 的信息,因此您需要在重新启动时使用相同的命令行参数。 停止时只能使用相同的 pidfile 和 logfile 参数。
默认情况下,它将在当前目录中创建 pid 和日志文件。 为了防止多个工作人员在彼此之上启动,我们鼓励您将它们放在一个专用目录中:
$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
使用 multi 命令你可以启动多个 worker,还有一个强大的命令行语法可以为不同的 worker 指定参数,例如:
$ celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \
-Q default -L:4,5 debug
有关更多示例,请参阅 API 参考中的 multi
模块。
关于 --app 参数
--app
参数指定要使用的 Celery 应用程序实例,格式为 module.path:attribute
但它也支持快捷方式。 如果仅指定了包名称,它将尝试按以下顺序搜索应用程序实例:
使用 --app=proj
:
- 名为
proj.app
的属性,或 - 名为
proj.celery
的属性,或 - 模块
proj
中值为 Celery 应用程序的任何属性,或
如果这些都没有找到,它会尝试一个名为 proj.celery
的子模块:
名为
proj.celery.app
的属性,或名为
proj.celery.celery
的属性,或模块
proj.celery
中值为 Celery 应用程序的任何属性。
这个方案模仿了文档中使用的实践——即,proj:app
用于单个包含的模块,而 proj.celery:app
用于更大的项目。
调用任务
您可以使用 delay()
方法调用任务:
>>> from proj.tasks import add
>>> add.delay(2, 2)
这个方法实际上是另一个名为 apply_async()
的方法的星形参数快捷方式:
>>> add.apply_async((2, 2))
后者使您能够指定执行选项,例如运行时间(倒计时)、应将其发送到的队列等:
>>> add.apply_async((2, 2), queue='lopri', countdown=10)
在上面的例子中,任务将被发送到一个名为 lopri
的队列,任务将在消息发送后最早 10 秒执行。
直接应用任务会执行当前进程中的任务,这样就不会发送消息:
>>> add(2, 2)
4
这三个方法 - delay()
、apply_async()
和应用 (__call__
) 组成了 Celery 调用 API,该 API 也用于签名。
可以在 调用用户指南 中找到调用 API 的更详细概述。
每个任务调用都会被赋予一个唯一标识符(UUID)——这就是任务 ID。
delay
和 apply_async
方法返回一个 @AsyncResult
实例,可用于跟踪任务执行状态。 但是为此您需要启用 result backend 以便可以将状态存储在某处。
默认情况下禁用结果,因为没有适合每个应用程序的结果后端; 要选择一个,您需要考虑每个后端的缺点。 对于许多任务来说,保留返回值甚至不是很有用,所以它是一个明智的默认设置。 另请注意,结果后端不用于监控任务和工作人员:因为 Celery 使用专用的事件消息(请参阅 监控和管理指南 )。
如果您配置了结果后端,您可以检索任务的返回值:
>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4
您可以通过查看 id
属性来找到任务的 id:
>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114
如果任务引发异常,您还可以检查异常和回溯,实际上 result.get()
将默认传播任何错误:
>>> res = add.delay(2, '2')
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/result.py", line 221, in get
return self.backend.wait_for_pending(
File "celery/backends/asynchronous.py", line 195, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "celery/result.py", line 333, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "celery/result.py", line 326, in throw
self.on_ready.throw(*args, **kwargs)
File "vine/promises.py", line 244, in throw
reraise(type(exc), exc, tb)
File "vine/five.py", line 195, in reraise
raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'
如果您不希望错误传播,您可以通过传递 propagate
来禁用它:
>>> res.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")
在这种情况下,它将返回引发的异常实例 - 因此要检查任务是成功还是失败,您必须在结果实例上使用相应的方法:
>>> res.failed()
True
>>> res.successful()
False
那么它如何知道任务是否失败呢? 它可以通过查看任务 state 来找出:
>>> res.state
'FAILURE'
一个任务只能处于一种状态,但它可以通过多个状态进行。 典型任务的阶段可以是:
PENDING -> STARTED -> SUCCESS
启动状态是一种特殊状态,只有在启用 :setting:`task_track_started` 设置,或者为任务设置了 @task(track_started=True)
选项时才会记录。
挂起状态实际上不是记录状态,而是任何未知任务 ID 的默认状态:您可以从这个示例中看到:
>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'
如果重试任务,阶段会变得更加复杂。 为了演示,对于重试两次的任务,阶段将是:
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
要阅读有关任务状态的更多信息,您应该查看任务用户指南中的 States 部分。
调用任务在调用指南中有详细描述。
Canvas:设计工作流程
您刚刚学习了如何使用 tasks delay
方法调用任务,这通常就是您所需要的。 但有时您可能希望将任务调用的签名传递给另一个进程或作为另一个函数的参数,为此 Celery 使用称为 signatures 的东西。
签名包装了单个任务调用的参数和执行选项,这样它就可以传递给函数,甚至可以序列化并通过网络发送。
您可以使用参数 (2, 2)
和 10 秒倒计时为 add
任务创建签名,如下所示:
>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
还有一个使用星形参数的快捷方式:
>>> add.s(2, 2)
tasks.add(2, 2)
还有那个再次调用API...…
签名实例还支持调用 API,这意味着它们具有 delay
和 apply_async
方法。
但区别在于签名可能已经指定了参数签名。 add
任务接受两个参数,因此指定两个参数的签名将构成一个完整的签名:
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4
但是,您也可以制作不完整的签名来创建我们所说的 部分 :
# incomplete partial: add(?, 2)
>>> s2 = add.s(2)
s2
现在是一个部分签名,需要另一个参数才能完成,这可以在调用签名时解决:
# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10
在这里,您将参数 8 添加到现有参数 2 之前,形成完整的 add(8, 2)
签名。
也可以稍后添加关键字参数; 然后将它们与任何现有的关键字参数合并,但新参数优先:
>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False) # debug is now False.
如前所述,签名支持调用 API:这意味着
sig.apply_async(args=(), kwargs={}, **options)
使用可选的部分参数和部分关键字参数调用签名。 还支持部分执行选项。
sig.delay(*args, **kwargs)
apply_async
的星形参数版本。 任何参数都将被添加到签名中的参数之前,并且关键字参数与任何现有的键合并。
所以这一切看起来都非常有用,但你实际上可以用这些做什么? 为此,我必须介绍画布原语...…
原始人
这些原语本身就是签名对象,因此它们可以以多种方式组合以组成复杂的工作流。
笔记
这些示例检索结果,因此要尝试它们,您需要配置一个结果后端。 上面的示例项目已经做到了(请参阅 Celery
的后端参数)。
让我们看一些例子:
团体
group
并行调用任务列表,它返回一个特殊的结果实例,让您可以将结果作为一个组进行检查,并按顺序检索返回值。
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- 部分组
>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
链条
任务可以链接在一起,以便在一个任务返回后调用另一个:
>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
或部分链:
>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
链也可以这样写:
>>> (add.s(4, 4) | mul.s(8))().get()
64
和弦
和弦是带有回调的组:
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90
链接到另一个任务的组将自动转换为和弦:
>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90
由于这些原语都是签名类型,因此它们几乎可以随心所欲地组合,例如:
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
请务必在 Canvas 用户指南中阅读有关工作流程的更多信息。
路由
Celery 支持 AMQP 提供的所有路由工具,但它也支持将消息发送到命名队列的简单路由。
:setting:`task_routes` 设置使您可以按名称路由任务并将所有内容集中在一个位置:
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
您还可以在运行时使用 apply_async
的 queue
参数指定队列:
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')
然后,您可以通过指定 celery worker -Q
选项使工作人员从此队列中消费:
$ celery -A proj worker -Q hipri
您可以使用逗号分隔的列表指定多个队列。 例如,您可以让 worker 从默认队列和 hipri
队列中消费,其中由于历史原因,默认队列命名为 celery
:
$ celery -A proj worker -Q hipri,celery
队列的顺序无关紧要,因为工作人员会给队列赋予相同的权重。
要了解有关路由的更多信息,包括充分利用 AMQP 路由的功能,请参阅 路由指南 。
遥控器
如果您使用 RabbitMQ (AMQP)、Redis 或 Qpid 作为代理,那么您可以在运行时控制和检查工作线程。
例如,您可以查看工作人员当前正在处理的任务:
$ celery -A proj inspect active
这是通过使用广播消息实现的,因此集群中的每个工作人员都会收到所有远程控制命令。
您还可以使用 --destination
选项指定一名或多名工作人员对请求采取行动。 这是一个以逗号分隔的工作主机名列表:
$ celery -A proj inspect active --destination=celery@example.com
如果没有提供目的地,那么每个工作人员都会采取行动并回复请求。
celery inspect 命令包含不会改变 worker 中任何东西的命令; 它只返回有关工作人员内部发生的事情的信息和统计信息。 对于可以执行的检查命令列表:
$ celery -A proj inspect --help
然后是 celery control 命令,其中包含在运行时实际更改工作程序中的内容的命令:
$ celery -A proj control --help
例如,您可以强制工作人员启用事件消息(用于监控任务和工作人员):
$ celery -A proj control enable_events
启用事件后,您可以启动事件转储程序以查看工作人员正在做什么:
$ celery -A proj events --dump
或者你可以启动curses界面:
$ celery -A proj events
完成监控后,您可以再次禁用事件:
$ celery -A proj control disable_events
celery status 命令也使用远程控制命令并显示集群中的在线工作者列表:
$ celery -A proj status
您可以在 监控指南 中阅读有关 celery 命令和监控的更多信息。
时区
内部和消息中的所有时间和日期都使用 UTC 时区。
当工作人员收到一条消息时,例如设置了倒计时,它会将 UTC 时间转换为本地时间。 如果您希望使用与系统时区不同的时区,则必须使用 :setting:`timezone` 设置进行配置:
app.conf.timezone = 'Europe/London'
优化
默认配置未针对吞吐量进行优化。 默认情况下,它尝试在许多短任务和较少长任务之间走中间路线,这是吞吐量和公平调度之间的折衷。
如果您有严格的公平调度要求,或者想要优化吞吐量,那么您应该阅读 优化指南 。