Celery 3.0 的新特性(Chiastic Slide) — Python 文档
Celery 3.0 的新特性(Chiastic Slide)
Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
Celery 拥有庞大而多样化的用户和贡献者社区,您应该在 IRC 或 我们的邮件列表 上加入我们 。
要阅读有关 Celery 的更多信息,您应该阅读 介绍 。
虽然此版本向后兼容以前的版本,但阅读以下部分很重要。
如果您将 Celery 与 Django 结合使用,您还必须阅读 django-celery 更新日志并升级到 :pypi:`django-celery 3.0 ` .
此版本在 CPython 2.5、2.6、2.7、3.2 和 3.3 以及 PyPy 和 Jython 上得到官方支持。
亮点
概述
一个新的和改进的 API,它既简单又强大。
每个人都必须阅读新的 使用 Celery 的第一步 教程,以及新的 后续步骤 教程。 哦,为什么不在使用时重新阅读用户指南:)
目前没有弃用旧 API 的计划,因此您不必急于移植您的应用程序。
工作线程现在是无线程的,从而极大地提高了性能。
新的“画布”可以轻松定义复杂的工作流程。
曾经想将任务链接在一起吗? 这是可能的,但不仅如此,现在您甚至可以将组和和弦链接在一起,甚至组合多个链。
在 Canvas 用户指南中阅读更多内容。
Celery 的所有命令行程序现在都可以通过一个 celery 伞形命令获得。
这是支持 Python 2.5 的最后一个版本。
从 Celery 3.1 开始,需要 Python 2.6 或更高版本。
支持新的 :pypi:`librabbitmq` C 客户端。
如果安装了 Celery 将自动使用 :pypi:`librabbitmq` 模块,这是对 :pypi:`amqp` 模块的非常快速和内存优化的替代。
Redis 支持通过改进的 ack 模拟更加可靠。
Celery 现在总是使用 UTC
超过 600 次提交,30k 次添加/36k 次删除。
相比之下,1.0➝ 2.0 有 18k 个添加/8k 个删除。
重要说明
广播交易所更名
工人远程控制命令交换已被重命名(一个新的 pidbox 名称),这是因为交换上的 auto_delete
标志已被删除,这使得它与早期版本不兼容。
如果需要,您可以使用 celery amqp 命令(以前称为 camqadm
)手动删除旧的交换:
$ celery amqp exchange.delete celeryd.pidbox
$ celery amqp exchange.delete reply.celeryd.pidbox
事件循环
当与 RabbitMQ (AMQP) 或 Redis 作为代理一起使用时,worker 现在运行 而不使用线程 ,结果:
- 整体性能要好得多。
- 修复了几个边缘情况竞争条件。
- 亚毫秒级计时器精度。
- 更快的关机时间。
支持的传输是:py-amqp
librabbitmq
、redis
和 amqplib
。 希望这可以扩展到将来包括其他代理传输。
为了提高可靠性,如果不使用事件循环,则默认启用 :setting:`CELERY_FORCE_EXECV` 设置。
新的 celery 伞命令
所有 Celery 的命令行程序现在都可以通过一个 celery 伞形命令获得。
您可以通过运行以下命令查看子命令和选项列表:
$ celery help
命令包括:
celery worker
(以前是celeryd
)。celery beat
(以前是celerybeat
)。celery amqp
(以前是camqadm
)。
旧程序仍然可用(celeryd
、celerybeat
等),但不鼓励您使用它们。
现在取决于 :pypi:`billiard`
Billiard 是 multiprocessing 的分支,包含 sbt
(http://bugs.python.org/issue8713) 的 no-execv 补丁,并且还包含先前位于 Celery 中的池改进.
这个分支是必要的,因为 no-execv 补丁需要更改 C 扩展代码才能工作。
- 问题 #625
- 问题 #627
- 问题 #640
- django 芹菜 #122
- django 芹菜 #124
celery.app.task 不再是一个包
celery.app.task
模块现在是一个模块而不是一个包。
setup.py
安装脚本将尝试删除旧包,但如果由于某种原因不起作用,您必须手动删除它。 此命令有助于:
$ rm -r $(dirname $(python -c 'import celery;print(celery.__file__)'))/app/task/
如果您遇到类似 ImportError: cannot import name _unpickle_task
的错误,您只需删除旧包,一切都很好。
支持 Python 2.5 的最新版本
3.0 系列将是支持 Python 2.5 的最后一个版本,从 3.1 开始需要 Python 2.6 及更高版本。
随着其他几个发行版采取措施停止对 Python 2.5 的支持,我们觉得也是时候了。
此时 Python 2.6 应该可以广泛使用,我们敦促您升级,但如果不可能,您仍然可以选择继续使用 Celery 3.0,并且 Celery 3.1 中引入的重要错误修复将向后移植到 Celery 3.0根据要求。
现在使用 UTC 时区
这意味着消息中的 ETA/倒计时与 2.5 之前的 Celery 版本不兼容。
您可以通过设置 :setting:`CELERY_ENABLE_UTC` 设置来禁用 UTC 并恢复到旧的本地时间。
Redis:Ack 仿真改进
减少数据丢失的可能性。
现在通过在消息被消费时存储消息的副本来实现确认。 在消费者确认或拒绝之前,副本不会被删除。
这意味着未确认的消息将在连接关闭或超过可见性超时时重新传递。
可见性超时
这是确认超时,因此如果消费者在此时间限制内未确认消息,则消息将重新传递给另一个消费者。
默认情况下超时设置为一小时,但可以通过配置传输选项进行更改:
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
笔记
如果超过可见性超时,则尚未确认的消息将被重新传递,对于 Celery 用户而言,这意味着计划在超过可见性超时的时间内执行的 ETA/倒计时任务将被执行两次(或更多)。 如果您计划使用长 ETA/倒计时,您应该相应地调整可见性超时。
设置较长的超时意味着在发生电源故障时重新传送消息需要很长时间,但如果发生这种情况,您可以暂时将可见性超时设置得较低,以便在再次启动系统时清除消息。
新闻
链接任务
任务现在可以有回调和 errbacks,并记录依赖项
任务消息格式已更新为两个新的扩展键
两个键都可以为空/未定义或子任务列表。
callbacks
如果任务成功退出,则应用任务的结果作为参数。
errbacks
在执行任务时发生错误时应用,以任务的 uuid 作为参数。 由于可能无法序列化异常实例,因此它会传递任务的 uuid。 然后可以使用 uuid 从结果后端检索任务的异常和回溯。
link
和link_error
关键字参数已添加到apply_async
。这些向任务添加了回调和错误反馈,您可以在 链接(回调/错误反馈) 中阅读有关它们的更多信息。
我们现在跟踪任务发送的子任务,并且一些结果后端支持检索此信息。
task.request.children
包含当前正在执行的任务已应用的子任务的结果实例。
AsyncResult.children
返回任务依赖项,作为
AsyncResult
/ResultSet
实例的列表。异步结果文件
递归迭代任务依赖项,产生 (parent, node) 元组。
如果任何依赖项尚未返回,则引发 IncompleteStream。
异步结果图
任务依赖项的
DependencyGraph
。 有了这个,您还可以转换为点格式:with open('graph.dot') as fh: result.graph.to_dot(fh)
然后生成图形的图像:
$ dot -Tpng graph.dot -o graph.png
还包括一个名为
chain
的新特殊子任务:>>> from celery import chain # (2 + 2) * 8 / 2 >>> res = chain(add.subtask((2, 2)), mul.subtask((8,)), div.subtask((2,))).apply_async() >>> res.get() == 16 >>> res.parent.get() == 32 >>> res.parent.parent.get() == 4
添加
AsyncResult.get_leaf()
等待并返回叶子任务的结果。 这是遍历图形时找到的最后一个节点,但这意味着图形只能是一维的(实际上是一个列表)。
添加
subtask.link(subtask)
+subtask.link_error(subtask)
s.options.setdefault('link', []).append(subtask)
的快捷方式添加
subtask.flatten_links()
返回所有依赖项的扁平列表(递归)
Redis:优先支持
消息的 priority
字段现在受到 Redis 传输的尊重,每个命名队列都有多个列表。 然后按优先级顺序使用队列。
优先级字段是 0 - 9 范围内的数字,其中 0 是默认和最高优先级。
默认情况下,优先级范围折叠为四个步骤,因为九个步骤不太可能比使用四个步骤产生更多的好处。 步数可以通过设置priority_steps
传输选项来配置,它必须是排序顺序的数字列表:
>>> BROKER_TRANSPORT_OPTIONS = {
... 'priority_steps': [0, 2, 4, 6, 8, 9],
... }
以这种方式实现的优先级不如服务器端的优先级可靠,这就是为什么该功能被称为“准优先级”的原因; 使用路由仍然是确保服务质量的建议方法,因为客户端实现的优先级在很多方面都不足,例如,如果工作人员忙于长时间运行的任务,预取了许多消息,或者队列很拥挤。
尽管如此,将优先级与路由结合使用可能比单独使用路由或优先级更有益。 应该使用实验和监测来证明这一点。
由 Germán M. 提供 太棒了。
Redis:现在循环队列以便消费是公平的
这确保了一个非常繁忙的队列不会阻塞来自其他队列的消息,并确保所有队列都有平等的机会被消费。
以前也是这种情况,但是在切换到使用阻塞 pop 时意外改变了行为。
group/chord/chain现在是子任务
group 不再是
TaskSet
的别名,而是全新的,因为将TaskSet
类迁移为子任务非常困难。任务中添加了一个新的快捷方式:
>>> task.s(arg1, arg2, kw=1)
作为快捷方式:
>>> task.subtask((arg1, arg2), {'kw': 1})
可以使用
|
运算符链接任务:>>> (add.s(2, 2), pow.s(2)).apply_async()
可以使用
~
运算符“评估”子任务:>>> ~add.s(2, 2) 4 >>> ~(add.s(2, 2) | pow.s(2))
是一样的:
>>> chain(add.s(2, 2), pow.s(2)).apply_async().get()
新的 subtask_type 键已添加到子任务字典中。
这可以是字符串
"chord"
、"group"
、"chain"
、"chunks"
、"xmap"
或"xstarmap"
。maybe_subtask 现在使用 subtask_type 来重建对象,在使用非 pickle 序列化程序时使用。
这些操作的逻辑已移至专用任务 celery.chord、celery.chain 和 celery.group。
子任务不再继承自 AttributeDict。
它现在是一个纯 dict 子类,具有用于访问相关键的属性的属性。
repr 现在输出序列的命令式:
>>> from celery import chord >>> (chord([add.s(i, i) for i in xrange(10)], xsum.s()) | pow.s(2)) tasks.xsum([tasks.add(0, 0), tasks.add(1, 1), tasks.add(2, 2), tasks.add(3, 3), tasks.add(4, 4), tasks.add(5, 5), tasks.add(6, 6), tasks.add(7, 7), tasks.add(8, 8), tasks.add(9, 9)]) | tasks.pow(2)
新的远程控制命令
这些命令以前是实验性的,但它们已被证明是稳定的,现在作为官方 API 的一部分进行了记录。
:control:`add_consumer`/:control:`cancel_consumer`
告诉工作人员从新队列消费,或取消从队列消费。 此命令也已更改,以便工作人员记住添加的队列,这样即使重新连接连接,更改也会持续存在。
这些命令可作为
@control.add_consumer()
/@control.cancel_consumer()
以编程方式使用:>>> celery.control.add_consumer(queue_name, ... destination=['w1.example.com']) >>> celery.control.cancel_consumer(queue_name, ... destination=['w1.example.com'])
或使用 celery control 命令:
$ celery control -d w1.example.com add_consumer queue $ celery control -d w1.example.com cancel_consumer queue
笔记
请记住,没有 destination 的控制命令将发送到 所有工人 。
-
告诉启用
--autoscale
的工作人员更改自动缩放最大/最小并发设置。此命令可作为
@control.autoscale()
以编程方式使用:>>> celery.control.autoscale(max=10, min=5, ... destination=['w1.example.com'])
或使用 celery control 命令:
$ celery control -d w1.example.com autoscale 10 5
:control:`pool_grow`/:control:`pool_shrink`
告诉工作人员添加或删除池进程。
这些命令可作为
@control.pool_grow()
/@control.pool_shrink()
以编程方式使用:>>> celery.control.pool_grow(2, destination=['w1.example.com']) >>> celery.control.pool_shrink(2, destination=['w1.example.com'])
或使用 celery control 命令:
$ celery control -d w1.example.com pool_grow 2 $ celery control -d w1.example.com pool_shrink 2
celery control 现在支持 [X37X]:control:`rate_limit` 和 :control:`time_limit` 命令。
有关详细信息,请参阅
celery control --help
。
不可变的子任务
subtask
现在可以是不可变的,这意味着在调用回调时不会修改参数:
>>> chain(add.s(2, 2), clear_static_electricity.si())
意味着它不会接收父任务的参数,而 .si()
是一个快捷方式:
>>> clear_static_electricity.subtask(immutable=True)
记录改进
日志支持现在更符合最佳实践。
worker 使用的类不再使用 app.get_default_logger,而是使用 celery.utils.log.get_logger,它只是让记录器不设置级别,并添加一个 NullHandler。
不再传递记录器,而是每个使用日志记录的模块都定义了一个始终使用的模块全局记录器。
所有记录器都继承自一个名为“celery”的通用记录器。
之前
task.get_logger
会为每个任务设置一个新的记录器,甚至设置日志级别。 这已不再是这种情况。相反,所有任务记录器现在都继承自一个通用的“celery.task”记录器,该记录器在程序调用 setup_logging_subsystem 时设置。
任务基础记录器现在使用特殊的格式化程序在运行时从当前正在执行的任务中添加这些值,而不是使用 LoggerAdapter 来增加带有 task_id 和 task_name 字段的格式化程序。
事实上,
task.get_logger
已经不再推荐了,最好在你的任务模块中添加一个模块级的记录器。例如,像这样:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @celery.task def add(x, y): logger.debug('Adding %r + %r' % (x, y)) return x + y
生成的记录器将继承
"celery.task"
记录器,以便当前任务名称和 ID 包含在日志输出中。来自 stdout/stderr 的重定向输出现在记录到“celery.redirected”记录器。
此外,一些warnings.warn 已替换为logger.warn。
现在避免了“记录器多处理没有处理程序”警告
任务注册表不再是全局的
每个 Celery 实例现在都有自己的任务注册表。
您可以通过指定来使应用程序共享注册表:
>>> app1 = Celery()
>>> app2 = Celery(tasks=app1.tasks)
请注意,默认情况下,任务在注册表之间共享,因此任务将添加到每个后续创建的任务注册表中。 作为替代任务,可以通过将 shared
参数设置为 @task
装饰器来对特定任务注册表私有:
@celery.task(shared=False)
def add(x, y):
return x + y
抽象任务现在被延迟绑定
Task
类默认不再绑定到应用程序,它会在创建具体子类时首先绑定(和配置)。
这意味着您可以安全地导入和创建任务基类,而无需初始化应用程序环境:
from celery.task import Task
class DebugTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
print('CALLING %r' % (self,))
return self.run(*args, **kwargs)
>>> DebugTask
<unbound DebugTask>
>>> @celery1.task(base=DebugTask)
... def add(x, y):
... return x + y
>>> add.__class__
<class add of <Celery default:0x101510d10>>
懒惰的任务装饰器
@task
装饰器现在在与自定义应用程序一起使用时是惰性的。
也就是说,如果 accept_magic_kwargs
被启用(她称之为“compat 模式”),任务装饰器像以前一样内联执行,但是对于自定义应用程序,@task 装饰器现在返回一个特殊的 PromiseProxy 对象,该对象仅在访问时进行评估。
当 @finalize()
被调用时,或者在首次使用任务注册表时隐式地评估所有承诺。
智能-app选项
--app
选项现在“自动检测”
- 如果提供的路径是一个模块,它会尝试获取名为“celery”的属性。
- 如果提供的路径是一个包,它会尝试导入一个名为 celery' 的子模块,并从该模块中获取 celery 属性。
例如,如果您有一个名为 proj
的项目,其中 celery 应用程序位于 from proj.celery import app
,那么以下内容将等效:
$ celery worker --app=proj
$ celery worker --app=proj.celery:
$ celery worker --app=proj.celery:app
在其他新闻中
新的 :setting:`CELERYD_WORKER_LOST_WAIT` 用于在无法通知工作人员时控制在
billiard.WorkerLostError
之前的秒数超时(问题#595)。由布伦登克劳福德提供。
Redis 事件监视器队列现在会自动删除(问题 #436)。
应用实例工厂方法已转换为缓存描述符,可在访问时创建新子类。
例如,这意味着
app.Worker
是一个实际的类,并且在以下情况下会按预期工作:class Worker(app.Worker): ...
现在只有在设置了
MP_LOG
环境变量时才会发出多处理日志。现在可以使用代理 URL 创建 Celery 实例
app = Celery(broker='redis://')
现在可以使用 URL 设置结果后端
目前只支持redis。 使用示例:
CELERY_RESULT_BACKEND = 'redis://localhost/1'
心跳频率现在每 5 秒一次,并且频率随事件一起发送
心跳频率现在在工作程序事件消息中可用,以便客户端可以根据此值决定何时考虑工作程序离线。
模块 celery.actors 已被删除,并将成为 cl 的一部分。
引入新的
celery
命令,它是所有其他命令的入口点。可以通过调用
celery.start()
来运行此命令的主要内容。如果键以“@”开头,则注释现在支持装饰器。
例如:
def debug_args(fun): @wraps(fun) def _inner(*args, **kwargs): print('ARGS: %r' % (args,)) return _inner CELERY_ANNOTATIONS = { 'tasks.add': {'@__call__': debug_args}, }
此外,任务现在总是受类约束,因此注释方法最终会被绑定。
错误报告现在可用作命令和广播命令
从 Python REPL 获取它:
>>> import celery >>> print(celery.bugreport())
使用
celery
命令行程序:$ celery report
从远程工作者那里获取:
$ celery inspect report
模块
celery.log
移至celery.app.log
。模块
celery.task.control
移至celery.app.control
。-
当任务被撤销或终止时在主进程中发送。
AsyncResult.task_id
重命名为AsyncResult.id
TasksetResult.taskset_id
重命名为.id
xmap(task, sequence)
和xstarmap(task, sequence)
返回将任务函数应用于序列中每个项目的结果列表。
示例:
>>> from celery import xstarmap >>> xstarmap(add, zip(range(10), range(10)).apply_async() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
chunks(task, sequence, chunksize)
group.skew(start=, stop=, step=)
Skew 将倾斜组中单个任务的倒计时 - 例如对于这个组:
>>> g = group(add.s(i, i) for i in xrange(10))
将任务从 0 秒倾斜到 10 秒:
>>> g.skew(stop=10)
将在 0 秒内执行第一个任务,在 1 秒内执行第二个任务,在 2 秒内执行第三个任务,依此类推。
99% 的测试覆盖率
:setting:`CELERY_QUEUES` 现在可以是
Queue
实例的列表/元组。在内部
@amqp.queues
现在是名称/队列实例的映射,而不是即时转换。现在可以为
@control.inspect
指定连接。from kombu import Connection i = celery.control.inspect(connection=Connection('redis://')) i.active_queues()
:setting:`CELERY_FORCE_EXECV` 现在默认启用。
如果需要旧行为,则可以将设置设置为 False,或者将新的 –no-execv 选项设置为 celery worker。
已弃用的模块
celery.conf
已被移除。:setting:`CELERY_TIMEZONE` 现在总是需要安装 :pypi:`pytz` 库(除非时区设置为 UTC)。
东京暴君后端已被删除,不再受支持。
现在使用
maybe_declare()
来缓存队列声明。:setting:`CELERYBEAT_MAX_LOOP_INTERVAL` 设置不再有全局默认值,而是由各个调度程序设置。
Worker:现在会在错误报告中截断很长的消息正文。
尝试序列化错误时不再深度复制异常。
CELERY_BENCH
环境变量,现在还会在工作线程关闭时列出内存使用情况统计信息。Worker:现在只使用一个计时器来满足所有计时需求,而是设置不同的优先级。
现在可以安全地腌制异常参数
由马特龙贡献。
Worker/Beat 不再记录启动横幅。
以前它会记录严重性警告,现在它只写入标准输出。
发行版中的
contrib/
目录已重命名为extra/
。celery.contrib.migrate
:许多改进,包括; 过滤、队列迁移和支持从代理迁移的消息。由约翰·沃森贡献。
Worker:预取计数增量现在已优化并组合在一起。
Worker:不再调用远程控制命令队列中的
consume
两次。可能没有造成任何问题,但没有必要。
内件
app.broker_connection
现在是app.connection
这两个名字仍然有效。
兼容性模块现在在使用时动态生成。
这些模块是
celery.messaging
、celery.log
、celery.decorators
和celery.registry
。celery.utils
重构为多个模块:celery.utils.text
celery.utils.imports
celery.utils.functional
现在使用
kombu.utils.encoding
而不是celery.utils.encoding
。重命名模块
celery.routes
->celery.app.routes
。重命名包
celery.db
->celery.backends.database
。重命名模块
celery.abstract
->celery.worker.bootsteps
。命令行文档现在从模块文档字符串中解析出来。
测试套件目录已重新组织。
setup.py 现在从
requirements/
目录读取文档。Celery 命令不再包装输出(问题 #700)。
由托马斯·约翰逊提供。
实验性的
celery.contrib.methods:方法的任务装饰器
这是一个包含任务装饰器和任务装饰器过滤器的实验性模块,可用于从方法中创建任务:
from celery.contrib.methods import task_method
class Counter(object):
def __init__(self):
self.value = 1
@celery.task(name='Counter.increment', filter=task_method)
def increment(self, n=1):
self.value += 1
return self.value
有关详细信息,请参阅 celery.contrib.methods
。
计划外搬迁
通常我们不会进行向后不兼容的移除,但这些移除应该不会产生重大影响。
以下设置已重命名:
CELERYD_ETA_SCHEDULER
->CELERYD_TIMER
CELERYD_ETA_SCHEDULER_PRECISION
->CELERYD_TIMER_PRECISION
弃用时间线更改
请参阅 Celery 弃用时间表 。
celery.backends.pyredis
兼容模块已被删除。请改用
celery.backends.redis
!以下未记录的 API 已被移动:
control.inspect.add_consumer
->@control.add_consumer()
。control.inspect.cancel_consumer
->@control.cancel_consumer()
。control.inspect.enable_events
->@control.enable_events()
。control.inspect.disable_events
->@control.disable_events()
。
这种方式
inspect()
只用于不修改任何东西的命令,而进行更改的幂等控制命令在控制对象上。
修复
在 DatabaseError/OperationalError 上重试 SQLAlchemy 后端操作(问题 #634)
如果启用了延迟确认,则不会确认调用
retry
的任务修复由大卫马基贡献。
消息优先级参数没有正确传播到 Kombu(问题 #708)。
修复由 Eran Rundstein 贡献