Canvas:设计工作流 — Python 文档
Canvas:设计工作流程
签名
2.0 版中的新功能。
您刚刚在 calling 指南中学习了如何使用 tasks delay
方法调用任务,这通常是您所需要的,但有时您可能想要传递任务调用的签名到另一个进程或作为另一个函数的参数。
signature()
以某种方式包装单个任务调用的参数、关键字参数和执行选项,以便它可以传递给函数,甚至可以序列化并通过网络发送。
您可以使用其名称为
add
任务创建签名,如下所示:>>> from celery import signature >>> signature('tasks.add', args=(2, 2), countdown=10) tasks.add(2, 2)
此任务具有 arity 2(两个参数)的签名:
(2, 2)
,并将倒计时执行选项设置为 10。或者您可以使用任务的
signature
方法创建一个:>>> add.signature((2, 2), countdown=10) tasks.add(2, 2)
还有一个使用星形参数的快捷方式:
>>> add.s(2, 2) tasks.add(2, 2)
还支持关键字参数:
>>> add.s(2, 2, debug=True) tasks.add(2, 2, debug=True)
从任何签名实例,您可以检查不同的字段:
>>> s = add.signature((2, 2), {'debug': True}, countdown=10) >>> s.args (2, 2) >>> s.kwargs {'debug': True} >>> s.options {'countdown': 10}
支持
delay
、apply_async
等的“调用API”,包括直接调用(__call__
)。调用签名将在当前进程中内联执行任务:
>>> add(2, 2) 4 >>> add.s(2, 2)() 4
delay
是我们钟爱的apply_async
取星参数的捷径:>>> result = add.delay(2, 2) >>> result.get() 4
apply_async
采用与Task.apply_async
方法相同的参数:>>> add.apply_async(args, kwargs, **options) >>> add.signature(args, kwargs, **options).apply_async() >>> add.apply_async((2, 2), countdown=1) >>> add.signature((2, 2), countdown=1).apply_async()
你不能用
s()
定义选项,但是一个链接的set
调用会处理这个:>>> add.s(2, 2).set(countdown=1) proj.tasks.add(2, 2)
部分
有了签名,你就可以在一个worker中执行任务:
>>> add.s(2, 2).delay()
>>> add.s(2, 2).apply_async(countdown=1)
或者你可以在当前进程中直接调用它:
>>> add.s(2, 2)()
4
为 apply_async
/delay
指定额外的 args、kwargs 或选项会创建部分:
添加的任何参数都将添加到签名中的 args 之前:
>>> partial = add.s(2) # incomplete signature >>> partial.delay(4) # 4 + 2 >>> partial.apply_async((4,)) # same
添加的任何关键字参数都将与签名中的 kwargs 合并,新的关键字参数优先:
>>> s = add.s(2, 2) >>> s.delay(debug=True) # -> add(2, 2, debug=True) >>> s.apply_async(kwargs={'debug': True}) # same
添加的任何选项都将与签名中的选项合并,新选项优先:
>>> s = add.signature((2, 2), countdown=10) >>> s.apply_async(countdown=1) # countdown is now 1
您还可以克隆签名以创建衍生产品:
>>> s = add.s(2)
proj.tasks.add(2)
>>> s.clone(args=(4,), kwargs={'debug': True})
proj.tasks.add(4, 2, debug=True)
不变性
3.0 版中的新功能。
Partials 旨在与回调一起使用,任何链接的任务或和弦回调都将与父任务的结果一起应用。 有时您想指定一个不接受额外参数的回调,在这种情况下,您可以将签名设置为不可变:
>>> add.apply_async((2, 2), link=reset_buffers.signature(immutable=True))
.si()
快捷方式也可用于创建不可变签名:
>>> add.apply_async((2, 2), link=reset_buffers.si())
当签名不可变时,只能设置执行选项,因此无法使用部分 args/kwargs 调用签名。
笔记
在本教程中,我有时会使用前缀运算符 ~ 来签名。 您可能不应该在您的生产代码中使用它,但在 Python shell 中进行实验时,它是一个方便的快捷方式:
>>> ~sig
>>> # is the same as
>>> sig.delay().get()
回调
3.0 版中的新功能。
可以使用 apply_async
的 link
参数将回调添加到任何任务:
add.apply_async((2, 2), link=other_task.s())
只有当任务成功退出时才会应用回调,并且它将以父任务的返回值作为参数应用。
正如我之前提到的,您添加到签名的任何参数都将添加到签名本身指定的参数之前!
如果你有签名:
>>> sig = add.s(10)
然后 sig.delay(result) 变成:
>>> add.apply_async(args=(result, 10))
…
现在让我们使用部分参数通过回调调用我们的 add
任务:
>>> add.apply_async((2, 2), link=add.s(8))
正如预期的那样,这将首先启动一个计算 \(2 + 2\) 的任务,然后是另一个计算 \(4 + 8\) 的任务。
原始人
3.0 版中的新功能。
概述
group
group 原语是一个签名,它采用应该并行应用的任务列表。
chain
链原语让我们将签名链接在一起,以便一个接一个调用,本质上形成一个 chain 回调。
chord
和弦就像一个组,但带有回调。 和弦由标题组和正文组成,其中正文是在标题中的所有任务完成后应执行的任务。
map
map 原语的工作原理类似于内置的
map
函数,但会创建一个临时任务,其中将参数列表应用于任务。 例如,task.map([1, 2])
- 导致调用单个任务,将参数按顺序应用于任务函数,结果为:res = [task(1), task(2)]
starmap
除了参数被应用为
*args
之外,它的工作方式与 map 完全一样。 例如add.starmap([(2, 2), (4, 4)])
导致单个任务调用:res = [add(2, 2), add(4, 4)]
chunks
分块将一长串参数拆分为多个部分,例如操作:
>>> items = zip(range(1000), range(1000)) # 1000 items >>> add.chunks(items, 10)
将项目列表拆分为 10 个块,从而产生 100 个任务(每个任务依次处理 10 个项目)。
原语本身也是签名对象,因此它们可以以多种方式组合以组成复杂的工作流。
下面是一些例子:
简单链
这是一个简单的链,第一个任务执行时将其返回值传递给链中的下一个任务,依此类推。
>>> from celery import chain >>> # 2 + 2 + 4 + 8 >>> res = chain(add.s(2, 2), add.s(4), add.s(8))() >>> res.get() 16
这也可以使用管道编写:
>>> (add.s(2, 2) | add.s(4) | add.s(8))().get() 16
不可变签名
签名可以是部分的,因此可以将参数添加到现有参数中,但您可能并不总是想要这样,例如,如果您不想要链中前一个任务的结果。
在这种情况下,您可以将签名标记为不可变,以便无法更改参数:
>>> add.signature((2, 2), immutable=True)
还有一个
.si()
快捷方式,这是创建签名的首选方式:>>> add.si(2, 2)
现在您可以创建一系列独立的任务:
>>> res = (add.si(2, 2) | add.si(4, 4) | add.si(8, 8))() >>> res.get() 16 >>> res.parent.get() 8 >>> res.parent.parent.get() 4
简单组
您可以轻松创建一组并行执行的任务:
>>> from celery import group >>> res = group(add.s(i, i) for i in range(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
简单的和弦
chord 原语使我们能够添加一个回调,当一个组中的所有任务都完成执行时,将调用该回调。 对于不是 令人尴尬地并行 的算法,这通常是必需的:
>>> from celery import chord >>> res = chord((add.s(i, i) for i in range(10)), xsum.s())() >>> res.get() 90
上面的例子创建了 10 个并行启动的任务,当所有任务都完成时,返回值组合成一个列表并发送到
xsum
任务。和弦的主体也可以是不可变的,因此组的返回值不会传递给回调:
>>> chord((import_contact.s(c) for c in contacts), ... notify_complete.si(import_id)).apply_async()
注意上面
.si
的使用; 这会创建一个不可变的签名,这意味着传递的任何新参数(包括前一个任务的返回值)都将被忽略。组合让你大开眼界
链也可以是部分的:
>>> c1 = (add.s(4) | mul.s(8)) # (16 + 4) * 8 >>> res = c1(16) >>> res.get() 160
这意味着您可以组合链:
# ((4 + 16) * 2 + 4) * 8 >>> c2 = (add.s(4, 16) | mul.s(2) | (add.s(4) | mul.s(8))) >>> res = c2() >>> res.get() 352
将一个组与另一个任务链接在一起将自动将其升级为一个和弦:
>>> c3 = (group(add.s(i, i) for i in range(10)) | xsum.s()) >>> res = c3() >>> res.get() 90
Groups 和 chords 也接受部分参数,所以在一个链中,前一个任务的返回值被转发到组中的所有任务:
>>> new_user_workflow = (create_user.s() | group( ... import_contacts.s(), ... send_welcome_email.s())) ... new_user_workflow.delay(username='artv', ... first='Art', ... last='Vandelay', ... email='art@vandelay.com')
如果您不想将参数转发给组,则可以使组中的签名不可变:
>>> res = (add.s(4, 4) | group(add.si(i, i) for i in range(10)))() >>> res.get() <GroupResult: de44df8c-821d-4c84-9a6a-44769c738f98 [ bc01831b-9486-4e51-b046-480d7c9b78de, 2650a1b8-32bf-4771-a645-b0a35dcc791b, dcbee2a5-e92d-4b03-b6eb-7aec60fd30cf, 59f92e0a-23ea-41ce-9fad-8645a0e7759c, 26e1e707-eccf-4bf4-bbd8-1e1729c3cce3, 2d10a5f4-37f0-41b2-96ac-a973b1df024d, e13d3bdb-7ae3-4101-81a4-6f17ee21df2d, 104b2be0-7b75-44eb-ac8e-f9220bdfa140, c5c551a5-0386-4973-aa37-b65cbeb2624b, 83f72d71-4b71-428e-b604-6f16599a9f37]> >>> res.parent.get() 8
链条
3.0 版中的新功能。
任务可以链接在一起:当任务成功返回时调用链接的任务:
>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
4
链接的任务将以其父任务的结果作为第一个参数来应用。 在上述结果为 4 的情况下,这将导致 mul(4, 16)
。
结果将跟踪原始任务调用的任何子任务,这可以从结果实例中访问:
>>> res.children
[<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>]
>>> res.children[0].get()
64
结果实例还有一个 collect()
方法,将结果视为图形,使您能够迭代结果:
>>> list(res.collect())
[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4),
(<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
默认情况下,collect()
将在图形未完全形成(其中一项任务尚未完成)时引发 @IncompleteStream
异常,但您也可以获得图形的中间表示:
>>> for result, value in res.collect(intermediate=True):
....
您可以根据需要将任意数量的任务链接在一起,也可以链接签名:
>>> s = add.s(2, 2)
>>> s.link(mul.s(4))
>>> s.link(log_result.s())
您还可以使用 on_error 方法添加 错误回调 :
>>> add.s(2, 2).on_error(log_error.s()).delay()
应用签名时,这将导致以下 .apply_async
调用:
>>> add.apply_async((2, 2), link_error=log_error.s())
工作人员实际上不会将 errback 作为任务调用,而是直接调用 errback 函数,以便可以将原始请求、异常和回溯对象传递给它。
这是一个错误返回示例:
from __future__ import print_function
import os
from proj.celery import app
@app.task
def log_error(request, exc, traceback):
with open(os.path.join('/var/errors', request.id), 'a') as fh:
print('--\n\n{0} {1} {2}'.format(
request.id, exc, traceback), file=fh)
为了更轻松地将任务链接在一起,有一个名为 chain
的特殊签名,可让您将任务链接在一起:
>>> from celery import chain
>>> from proj.tasks import add, mul
>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)
调用链会调用当前进程中的任务,并返回链中最后一个任务的结果:
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.get()
640
它还设置 parent
属性,以便您可以沿着链向上工作以获得中间结果:
>>> res.parent.get()
64
>>> res.parent.parent.get()
8
>>> res.parent.parent
<AsyncResult: eeaad925-6778-4ad1-88c8-b2a63d017933>
也可以使用 |
(管道)运算符制作链:
>>> (add.s(2, 2) | mul.s(8) | mul.s(10)).apply_async()
图表
此外,您可以将结果图用作 DependencyGraph
:
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.parent.parent.graph
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
463afec2-5ed4-4036-b22d-ba067ec64f52(0)
872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
285fa253-fcf8-42ef-8b95-0078897e83e6(1)
463afec2-5ed4-4036-b22d-ba067ec64f52(0)
您甚至可以将这些图形转换为 dot 格式:
>>> with open('graph.dot', 'w') as fh:
... res.parent.parent.graph.to_dot(fh)
并创建图像:
$ dot -Tpng graph.dot -o graph.png
团体
3.0 版中的新功能。
一个组可用于并行执行多个任务。
group
函数接受一个签名列表:
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))
如果你调用组,任务会在当前进程中一个接一个地应用,并返回一个GroupResult
实例,可以用来跟踪结果,或者告诉如何许多任务已准备就绪,等等:
>>> g = group(add.s(2, 2), add.s(4, 4))
>>> res = g()
>>> res.get()
[4, 8]
Group 还支持迭代器:
>>> group(add.s(i, i) for i in range(100))()
组是签名对象,因此可以与其他签名组合使用。
组回调和错误处理
组也可以有关联的回调和错误返回签名,但是由于组不是真正的任务并且只是将链接的任务向下传递给它们封装的签名,因此行为可能有点令人惊讶。 这意味着不会收集组的返回值以传递给链接的回调签名。 例如,以下使用简单 add(a, b) 任务的代码片段是错误的,因为链接的 add.s() 签名不会像人们那样收到最终的分组结果预计。
>>> g = group(add.s(2, 2), add.s(4, 4))
>>> g.link(add.s())
>>> res = g()
[4, 8]
请注意,将返回前两个任务的最终结果,但回调签名将在后台运行并引发异常,因为它没有收到预期的两个参数。
组 errback 也会传递给封装的签名,这使得如果组中的多个任务失败,则只链接一次的 errback 可能会被多次调用。 例如,以下使用 fail() 任务引发异常的代码片段可以预期为组中运行的每个失败任务调用一次 log_error() 签名.
>>> g = group(fail.s(), fail.s())
>>> g.link_error(log_error.s())
>>> res = g()
考虑到这一点,通常建议创建幂等或计数任务,这些任务可以容忍重复调用以用作 errbacks。
某些后端实现支持的 chord
类可以更好地解决这些用例。
分组结果
group 任务也返回一个特殊的结果,这个结果和普通任务结果一样,只是它对整个 group 起作用:
>>> from celery import group
>>> from tasks import add
>>> job = group([
... add.s(2, 2),
... add.s(4, 4),
... add.s(8, 8),
... add.s(16, 16),
... add.s(32, 32),
... ])
>>> result = job.apply_async()
>>> result.ready() # have all subtasks completed?
True
>>> result.successful() # were all subtasks successful?
True
>>> result.get()
[4, 8, 16, 32, 64]
GroupResult
获取一个 AsyncResult
实例列表,并对它们进行操作,就好像它是单个任务一样。
它支持以下操作:
successful()
如果所有子任务都成功完成(例如,没有引发异常),则返回
True
。failed()
如果任何子任务失败,则返回
True
。waiting()
如果有任何子任务尚未准备好,则返回
True
。ready()
如果所有子任务都准备好了,则返回
True
。completed_count()
返回已完成子任务的数量。
revoke()
撤销所有子任务。
join()
收集所有子任务的结果并按照调用它们的顺序返回它们(作为列表)。
和弦
2.3 版中的新功能。
和弦是仅在组中的所有任务执行完毕后才执行的任务。
让我们计算表达式的总和 \(1 + 1 + 2 + 2 + 3 + 3 ... n + n\) 最多一百位数。
首先你需要两个任务,add()
和 tsum()
(sum()
已经是标准函数了):
@app.task
def add(x, y):
return x + y
@app.task
def tsum(numbers):
return sum(numbers)
现在您可以使用和弦并行计算每个加法步骤,然后获得结果数字的总和:
>>> from celery import chord
>>> from tasks import add, tsum
>>> chord(add.s(i, i)
... for i in range(100))(tsum.s()).get()
9900
这显然是一个非常人为的例子,消息传递和同步的开销使它比 Python 对应的要慢很多:
>>> sum(i + i for i in range(100))
同步步骤成本很高,因此您应该尽可能避免使用和弦。 尽管如此,和弦仍然是您工具箱中一个强大的原语,因为同步是许多并行算法的必需步骤。
让我们分解和弦表达:
>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900
请记住,只有在标头中的所有任务都返回后才能执行回调。 标头中的每个步骤都作为一个任务并行执行,可能在不同的节点上。 然后使用标头中每个任务的返回值应用回调。 chord()
返回的任务id就是回调的id,所以你可以等待它完成并得到最终的返回值(但要记住永远不要有任务等待其他任务 )
错误处理
那么如果其中一项任务引发异常会发生什么?
和弦回调结果会转为失败状态,错误设置为@ChordError
异常:
>>> c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
>>> result = c()
>>> result.get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "*/celery/result.py", line 120, in get
interval=interval)
File "*/celery/backends/amqp.py", line 150, in wait_for
raise meta['result']
celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
raised ValueError('something something',)
虽然回溯可能因所使用的结果后端而异,但您可以看到错误描述包括失败任务的 id 和原始异常的字符串表示。 您还可以在 result.traceback
中找到原始回溯。
请注意,其余任务仍将执行,因此即使中间任务失败,仍会执行第三个任务 (add.s(8, 8)
)。 此外,@ChordError
仅显示首先失败的任务(及时):它不遵守标题组的顺序。
要在和弦失败时执行操作,您可以将 errback 附加到和弦回调:
@app.task
def on_chord_error(request, exc, traceback):
print('Task {0!r} raised error: {1!r}'.format(request.id, exc))
>>> c = (group(add.s(i, i) for i in range(10)) |
... xsum.s().on_error(on_chord_error.s())).delay()
和弦可能有回调和 errback 签名链接到它们,这解决了将签名链接到组的一些问题。 这样做会将提供的签名链接到和弦的主体,可以期望在主体完成后优雅地调用一次回调,或者如果和弦标题或主体中的任何任务失败,则只调用一次。
重要说明
在和弦中使用的任务必须 不 忽略它们的结果。 实际上,这意味着您必须启用 result_backend
才能使用和弦。 此外,如果在您的配置中将 task_ignore_result
设置为 True
,请确保使用 ignore_result=False
定义和弦中要使用的各个任务。 这适用于 Task 子类和装饰任务。
示例任务子类:
class MyTask(Task):
ignore_result = False
示例装饰任务:
@app.task(ignore_result=False)
def another_task(project):
do_something()
默认情况下,同步步骤是通过让重复任务每秒轮询组的完成情况来实现的,并在准备好时调用签名。
示例实现:
from celery import maybe_signature
@app.task(bind=True)
def unlock_chord(self, group, callback, interval=1, max_retries=None):
if group.ready():
return maybe_signature(callback).delay(group.join())
raise self.retry(countdown=interval, max_retries=max_retries)
这被除 Redis 和 Memcached 之外的所有结果后端使用:它们在标头中的每个任务之后增加一个计数器,然后当计数器超过集合中的任务数时应用回调。
Redis 和 Memcached 方法是一个更好的解决方案,但不容易在其他后端实现(欢迎提出建议!)。
笔记
在 2.2 版本之前,和弦不能与 Redis 正常工作; 您至少需要升级到 redis-server 2.2 才能使用它们。
笔记
如果您在 Redis 结果后端使用和弦并覆盖 Task.after_return()
方法,则需要确保调用 super 方法,否则将不会应用和弦回调。
def after_return(self, *args, **kwargs):
do_something()
super().after_return(*args, **kwargs)
地图和星图
map
和 starmap
是内置任务,它们为序列中的每个元素调用提供的调用任务。
它们与 group
的不同之处在于:
- 只发送一个任务消息。
- 操作是顺序的。
例如使用 map
:
>>> from proj.tasks import add
>>> ~xsum.map([range(10), range(100)])
[45, 4950]
与执行以下任务相同:
@app.task
def temp():
return [xsum(range(10)), xsum(range(100))]
并使用 starmap
:
>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
与执行以下任务相同:
@app.task
def temp():
return [add(i, i) for i in range(10)]
map
和 starmap
都是签名对象,因此可以作为其他签名使用,组合成组等,例如10秒后调用星图:
>>> add.starmap(zip(range(10), range(10))).apply_async(countdown=10)
大块
分块可以让你将一个可迭代的工作分成几部分,这样如果你有 100 万个对象,你就可以创建 10 个任务,每个任务有 10 万个对象。
有些人可能会担心分块任务会导致并行性下降,但对于繁忙的集群来说,这很少是真的,而且在实践中,因为您避免了消息传递的开销,它可能会显着提高性能。
要创建块签名,您可以使用 @Task.chunks()
:
>>> add.chunks(zip(range(100), range(100)), 10)
与 group
一样,为块发送消息的行为将在调用时发生在当前进程中:
>>> from proj.tasks import add
>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
[20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
[40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
[60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
[80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
[100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
[120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
[140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
[160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
[180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
在调用 .apply_async
时将创建一个专用任务,以便将各个任务应用到工作线程中:
>>> add.chunks(zip(range(100), range(100)), 10).apply_async()
您还可以将块转换为组:
>>> group = add.chunks(zip(range(100), range(100)), 10).group()
并且小组将每个任务的倒计时以 1 为增量倾斜:
>>> group.skew(start=1, stop=10)()
这意味着第一个任务将倒计时一秒,第二个任务倒计时两秒,依此类推。