Celery 4.0 的新特性(latentcall) — Python 文档

来自菜鸟教程
Celery/docs/latest/history/whatsnew-4.0
跳转至:导航、​搜索

Celery 4.0 中的新功能(latentcall)

作者
问索伦 (ask at celeryproject.org)

更改历史记录

What's new 文档描述了主要版本的变化,我们还有一个 Change history 列出了错误修复版本 (0.0.x) 的变化,而旧的系列存档在 History 部分.

Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度。

Celery 拥有庞大而多样化的用户和贡献者社区,您应该在 IRC 我们的邮件列表 上加入我们 。

要阅读有关 Celery 的更多信息,您应该阅读 介绍

虽然此版本向后兼容以前的版本,但阅读以下部分很重要。

CPython 2.7、3.4 和 3.5 正式支持此版本。 并且也支持 PyPy。

目录

在升级到此版本之前,请务必阅读重要说明。



前言

欢迎来到芹菜 4!

这是一个包含超过两年变化的大规模版本。 它不仅带来了许多新功能,而且还修复了大量错误,因此在很多方面您都可以称其为我们的“雪豹”版本。

Celery 的下一个主要版本将仅支持 Python 3.5,我们计划在其中利用新的 asyncio 库。

如果没有我的雇主 Robinhood(我们正在招聘!)的支持,这个版本是不可能实现的。

  • 问庄严

献给 Sebastian “Zeb” Bjørnerud (RIP),特别感谢 Ty Wilkins 设计我们的新标志、所有帮助实现这一目标的贡献者,以及我在 Robinhood 的同事.

贡献者墙

亚伦·麦克米林、亚当·查因茨、亚当·伦伯格、阿德里亚诺·马丁斯·德·赫苏斯、阿德里安·吉内、艾哈迈德·德米尔、Aitor Gómez-Goiri、艾伦·贾斯蒂诺、阿尔伯特·王、亚历克斯·科舍列夫、亚历克斯·拉特雷、亚历克斯·威廉姆斯、亚历山大·科舍列夫、亚历山大·列别杰夫、亚历山大·奥洛瓦特尼、阿列克谢科特利亚罗夫、阿里·博佐尔汗、爱丽丝·佐伊·贝文-麦格雷戈、阿拉德·霍夫、阿尔曼一号、阿米尔·鲁斯塔姆扎德、安德烈·拉巴列蒂、安德烈·罗莎、安德烈·福考、安德鲁·罗迪奥诺夫、安德鲁·斯图尔特、安德烈·尤尔丘克、阿内尔·马拉瓦拉普、阿姆拉克·巴鲁亚德Artyom Koval, Asif Saifuddin Auvi, Ask Solem, Balthazar Rouberol, Batiste Bieler, Berker Peksag, Bert Vanderbauwhede, Brendan Smithyman, Brian Bouterse, Bryce Groff, Cameron Will, ChangBo Guo, Chris Clark, Chris Duryee, Chris Erway, Chris Harris, Chris马丁、奇拉尔·阿南德、科林·麦金托什、康拉德·克莱默、科里·法威尔、克雷格·杰利克、卡伦·罗德斯、达拉斯·马洛、丹尼尔·迪瓦恩、丹尼尔·华莱士、达尼洛·巴根、达瓦努姆·斯里尼瓦斯、戴夫·史密斯、大卫·鲍姆戈尔德、大卫·哈里根、大卫·P 拉韦茨、丹尼斯·布拉克恩、德里克·安德森、德米特里·迪加洛、德米特里·马利诺夫斯基、董伟明、杜达斯·阿达姆、达斯汀·J. Mitchell, Ed Morley, Edward Betts, Éloi Rivard, Emmanuel Cazenave, Fahad Siddiqui, Fatih Sucu, Feanil Patel, Federico Ficarelli, Felix Schwarz, Felix Yan, Fernando Rocha, Flavio Grossi, Frantisek Holop, 高江淼, George Whewell, Geral吉尔斯·达蒂格朗格、吉诺·莱德斯马、格雷格·威尔伯、纪尧姆·塞甘、汉克·约翰、霍尼·吉尔法森、伊利亚·格奥尔基耶夫斯基、伊奥内尔·克里斯蒂安·梅里埃斯、伊万·拉林、詹姆斯·普莱克、贾里德·刘易斯、杰森·威奇、贾斯珀·布莱恩特-格林、杰夫·威德曼、杰瑞米·蒂尔、乔斯林·德拉兰德、乔·杰夫尼克、乔·桑福德、约翰·安德森、约翰·巴勒姆、约翰·柯克汉姆、约翰·惠特洛克、乔纳森·瓦纳斯科、约书亚·哈洛、若昂·里卡多、胡安·卡洛斯·费雷尔、胡安·罗西、贾斯汀·帕特林、凯·格罗纳、凯文·哈维、凯文·理查森、 Komu Wairagu、Konstantinos Koukopoulos、Kouhei Maeda、Kracekumar Ramaraju、Krzysztof Bujniewicz、Latitia M. 哈斯金斯、伦·巴肯斯、列夫·伯曼、利东明、洛伦佐·曼奇尼、卢卡斯·维曼、卢克·庞弗雷、谢鲁云、马切伊·奥布乔夫斯基、曼努埃尔·考夫曼、马拉特·沙拉夫迪诺夫、马克·西布森、马西奥·里贝罗、马林·阿塔纳索夫·尼科洛夫、马修·帕尼科、马鲁克马克西姆·博切明、马克西姆·维德、Mher Movsisyan、迈克尔·阿奎利纳、迈克尔·杜安·莫林、迈克尔·佩马纳、米凯尔·彭哈德、迈克·阿特伍德、米切尔·汉弗瑞斯、穆罕默德·阿布萨乌德、莫里斯·特威德、莫顿·福克斯、莫舍·范德斯特尔、纳特·威廉姆斯、内森·范格赫姆、 Unravel, Nik Nyby, Omer Katz, Omer Korner, Ori Hoch, Paul Pearce, Paulo Bu, Pavlo Kapyshin, Philip Garnero, Pierre Fersing, Piotr Kilczuk, Piotr Maślanka, Quentin Pradet, Radek Czajka, Raghuram Srinlowphael, Ran雷米·莱昂、罗伯特·库普、罗伯特·科尔巴、Rockallite Wulf、鲁道夫·卡瓦略、罗杰·胡、罗穆尔德·布鲁内、朱容泽、罗斯·迪恩、瑞恩·卢基、雷米·格雷因霍夫、塞缪尔·吉法德、塞缪尔·贾耶特、谢尔盖·阿佐夫斯科夫、谢尔盖·季霍诺夫、Seungha Kim、西蒙 rs,斯宾塞 E。 奥尔森、斯里尼瓦斯·加拉帕蒂、斯蒂芬·米尔纳、史蒂夫·皮克、史蒂文·斯克拉尔、斯图尔特·阿克森、苏克里特·凯拉、塔德杰·珍尼兹、塔哈·贾汉吉尔、金本武、泰芬森、Tewfik Sadaoui、托马斯·弗伦奇、托马斯·格兰杰、托马斯·马查莱克、托比亚斯·肖特多夫、托乔·托切夫、 Valentyn Klindukh, Vic Kumar, Vladimir Bolshakov, Vladimir Gorbunov, Wayne Chang, Wieland Hoffmann, Wido den Hollander, Wil Langford, Will Thompson, William King, Yury Selivanov, Vytis Banaitis, Zoran Pavlovic, Xin Li, 邱志翔, :github_user:`allenling`, :github_user:`alzeih`, :github_user:`bastb`, :github_user:`bee-keeper` , :github_user:`feast`, :github_user:`firefly4268`, :github_user:`flyingfoxlee`, :github_user:`dw X739X], :github_user:`gitaarik`, :github_user:`hankjin`, :github_user:`lvh`, :github_user:`m -vdb`, :github_user:`kindule`, :github_user:`mdk`[X93 5X]:, :github_user:`michael-k`, :github_user:`mozillazg`, :github_user:`nokrik`, : :`ocean1`, :github_user:`orlo666`, :github_user:`raducc`, :github_user:`wanglei`], :github_user:`worldexception`, :github_user:`xBeAsTx`

笔记

这堵墙是根据 git 历史自动生成的,因此遗憾的是,它不包括帮助解决更重要的事情(例如回答邮件列表问题)的人。


从 Celery 3.1 升级

第 1 步:升级到 Celery 3.1.25

如果您还没有,第一步是升级到 Celery 3.1.25。

此版本增加了对新消息协议的前向兼容性,以便您可以从 3.1 增量升级到 4.0。

首先通过升级到 3.1.25 来部署工作人员,这意味着这些工作人员可以使用 3.1 和 4.0 处理客户端发送的消息。

升级工作人员后,您可以升级客户端(例如 网络服务器)。


第 2 步:使用新的设置名称更新您的配置

此版本从根本上更改了配置设置名称,使其更加一致。

这些更改完全向后兼容,因此您可以选择等到旧设置名称被弃用,但为了简化转换,我们包含了一个命令行实用程序,可以自动重写您的设置。

有关详细信息,请参阅 小写设置名称


第 3 步:阅读本文档中的重要说明

确保您不受下一节中提到的任何重要升级说明的影响。

一个特别重要的注意事项是 Celery 现在通过将它与签名匹配来检查您发送给任务的参数(任务参数检查)。


第 4 步:升级到 Celery 4.0

此时,您可以使用新版本升级您的工作人员和客户端。


重要说明

不再支持 Python 2.6

Celery 现在需要 Python 2.7 或更高版本,并且不再支持 Python 3.3,因此支持的版本是:

  • CPython 2.7
  • CPython 3.4
  • CPython 3.5
  • PyPy 5.4 (pypy2)
  • PyPy 5.5-alpha (pypy3)


最后一个支持 Python 2 的主要版本

从 Celery 5.0 开始,仅支持 Python 3.5+。

为确保您不受此更改的影响,您应该将 Celery 版本固定在您的需求文件中,要么是特定版本:celery==4.0.0,要么是范围:celery>=4.0,<5.0

放弃对 Python 2 的支持将使我们能够删除大量的兼容性代码,而使用 Python 3.5 使我们能够利用键入、async/await、asyncio 和类似概念,这些概念在旧版本中无可替代。

Celery 4.x 将继续在 Python 2.7、3.4、3.5 上工作; 就像 Celery 3.x 仍然适用于 Python 2.6 一样。


Django 支持

Celery 4.x 需要 Django 1.8 或更高版本,但我们真的建议至少使用 Django 1.9 来实现新的 transaction.on_commit 功能。

从 Django 调用任务时的一个常见问题是当任务与模型更改相关时,您希望在事务回滚时取消任务,或者确保仅在更改写入数据库后才执行任务。

transaction.atomic 可以通过将任务添加为仅在事务提交时调用的回调来解决此问题。

用法示例:

from functools import partial
from django.db import transaction

from .models import Article, Log
from .tasks import send_article_created_notification

def create_article(request):
    with transaction.atomic():
        article = Article.objects.create(**request.POST)
        # send this task only if the rest of the transaction succeeds.
        transaction.on_commit(partial(
            send_article_created_notification.delay, article_id=article.pk))
        Log.objects.create(type=Log.ARTICLE_CREATED, object_pk=article.pk)

删除的功能

  • 不再支持 Microsoft Windows。

    测试套件正在通过,Celery 似乎可以与 Windows 一起使用,但我们不做任何保证,因为我们无法在此平台上诊断问题。 如果您是需要本平台支持的公司,请与我们联系。

  • 不再支持 Jython。

为简单起见删除了功能

  • Webhook 任务机制 (celery.task.http) 已被移除。

    现在很容易使用 :pypi:`requests` 模块手动编写 webhook 任务。 我们很想使用请求,但我们根本无法使用,因为 Python 社区中有一个非常强烈的“反依赖”暴徒

    如果您需要向后兼容,您可以简单地复制并粘贴模块的 3.1 版本并确保它是由工作人员导入的:https://github.com/celery/celery/blob/3.1/celery/task/http .py

  • 任务不再发送错误电子邮件。

    这也取消了对 app.mail_admins 以及与发送电子邮件相关的任何功能的支持。

  • celery.contrib.batches 已被移除。

    这是一项实验性功能,因此不在我们的弃用时间表保证范围内。

    您可以复制并粘贴现有的批处理代码以在您的项目中使用:https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py


由于缺乏资金而删除的功能

我们在 3.1 版本中宣布,一些传输被转移到实验状态,并且没有对传输的官方支持。

由于这种对资金需求的微妙暗示失败了,我们已经完全删除了它们,从而破坏了向后兼容性。

  • 不再支持使用 Django ORM 作为代理。

    您仍然可以使用 Django ORM 作为结果后端:有关更多信息,请参阅 django-celery-results - Using the Django ORM/Cache as a result backend 部分。

  • 不再支持使用 SQLAlchemy 作为代理。

    您仍然可以使用 SQLAlchemy 作为结果后端。

  • 不再支持使用 CouchDB 作为代理。

    您仍然可以使用 CouchDB 作为结果后端。

  • 不再支持使用 IronMQ 作为代理。

  • 不再支持使用 Beanstalk 作为代理。

此外,一些功能已被完全删除,因此尝试使用它们会引发异常:

  • --autoreload 功能已被删除。

    这是一项实验性功能,不在我们的弃用时间表保证范围内。 该标志被完全删除,因此工作人员将在启动时崩溃。 幸运的是,此标志未在生产系统中使用。

  • 不再支持实验性 threads 池并已删除。

  • 不再支持 force_execv 功能。

    celery worker 命令现在忽略 --no-execv--force-execvCELERYD_FORCE_EXECV 设置。

    此标志将在 5.0 中完全删除,工作人员将引发错误。

  • 旧的“amqp”结果后端已被弃用,并将在 Celery 5.0 中删除。

    请使用 rpc 结果后端进行 RPC 样式调用,并使用持久结果后端进行多消费者结果。

我们认为其中的大部分功能都可以轻松解决,因此如果您有兴趣恢复这些功能中的任何功能,请与我们联系。

好消息


新任务消息协议

此版本引入了全新的任务消息协议,这是自项目开始以来对协议的第一次重大更改。

此版本默认启用新协议,由于新版本不向后兼容,因此升级时必须小心。

发布 3.1.25 版本是为了增加与新协议的兼容性,因此最简单的升级方法是先升级到该版本,然后在第二次部署中升级到 4.0。

如果您希望继续使用旧协议,您还可以配置使用的协议版本号:

app = Celery()
app.conf.task_protocol = 1

在本文档后面的新闻部分阅读有关新协议中可用功能的更多信息。


小写设置名称

为了追求美观,所有设置现在都重命名为全部小写,并且一些设置名称已重命名以保持一致。

此更改完全向后兼容,因此您仍然可以使用大写设置名称,但我们希望您尽快升级,您可以使用 celery upgrade settings 命令自动执行此操作:

$ celery upgrade settings proj/settings.py

此命令将就地修改您的模块以使用新的小写名称(如果您想要带有“CELERY”前缀的大写,请参见下面的块),并在 proj/settings.py.orig 中保存备份。

对于想要保留大写名称的 Django 用户和其他人

如果您从 Django 设置模块加载 Celery 配置,那么您将希望继续使用大写名称。

您还希望使用 CELERY_ 前缀,以便 Celery 设置不会与其他应用程序使用的 Django 设置发生冲突。

为此,您首先需要将设置文件转换为使用新的一致命名方案,并将前缀添加到所有与 Celery 相关的设置中:

$ celery upgrade settings proj/settings.py --django

升级设置文件后,您需要在 proj/celery.py 模块中明确设置前缀:

app.config_from_object('django.conf:settings', namespace='CELERY')

您可以在此处找到最新的 Django Celery 集成示例: Django 的第一步

笔记

这也会为之前没有的设置添加一个前缀,例如 BROKER_URL 应该写为 CELERY_BROKER_URL,命名空间为 CELERY CELERY_BROKER_URL


幸运的是,您不必手动更改文件,因为 celery 升级设置 --django 程序应该做正确的事情。


加载程序将尝试检测您的配置是否使用新格式,并相应地采取行动,但这也意味着您不允许混合和匹配新旧设置名称,除非您为这两种选择都提供了值。

除了小写名称之外,以前版本之间的主要区别是一些前缀的重命名,例如 celerybeat_beat_celeryd_worker_

celery_ 前缀也已删除,此命名空间中与任务相关的设置现在以 task_ 为前缀,与工作人员相关的设置为 worker_

除此之外,除了一些特殊的设置外,大多数设置都将采用小写形式:

设置名称 用。。。来代替
CELERY_MAX_CACHED_RESULTS :设置:`result_cache_max`
CELERY_MESSAGE_COMPRESSION :setting:`result_compression`/:setting:`task_compression`
CELERY_TASK_RESULT_EXPIRES :设置:`result_expires`
CELERY_RESULT_DBURI :设置:`result_backend`
CELERY_RESULT_ENGINE_OPTIONS :设置:`database_engine_options`
-*-_DB_SHORT_LIVED_SESSIONS :设置:`database_short_lived_sessions`
CELERY_RESULT_DB_TABLE_NAMES :设置:`database_db_names`
CELERY_ACKS_LATE :设置:`task_acks_late`
CELERY_ALWAYS_EAGER :设置:`task_always_eager`
CELERY_ANNOTATIONS :设置:`task_annotations`
CELERY_MESSAGE_COMPRESSION :设置:`task_compression`
CELERY_CREATE_MISSING_QUEUES :设置:`task_create_missing_queues`
CELERY_DEFAULT_DELIVERY_MODE :设置:`task_default_delivery_mode`
CELERY_DEFAULT_EXCHANGE :设置:`task_default_exchange`
CELERY_DEFAULT_EXCHANGE_TYPE :设置:`task_default_exchange_type`
CELERY_DEFAULT_QUEUE :设置:`task_default_queue`
CELERY_DEFAULT_RATE_LIMIT :设置:`task_default_rate_limit`
CELERY_DEFAULT_ROUTING_KEY :设置:`task_default_routing_key`
-"-_EAGER_PROPAGATES_EXCEPTIONS :设置:`task_eager_propagates`
CELERY_IGNORE_RESULT :设置:`task_ignore_result`
CELERY_TASK_PUBLISH_RETRY :设置:`task_publish_retry`
CELERY_TASK_PUBLISH_RETRY_POLICY :设置:`task_publish_retry_policy`
CELERY_QUEUES :设置:`task_queues`
CELERY_ROUTES :设置:`task_routes`
CELERY_SEND_TASK_SENT_EVENT :设置:`task_send_sent_event`
CELERY_TASK_SERIALIZER :设置:`task_serializer`
CELERYD_TASK_SOFT_TIME_LIMIT :设置:`task_soft_time_limit`
CELERYD_TASK_TIME_LIMIT :设置:`task_time_limit`
CELERY_TRACK_STARTED :设置:`task_track_started`
CELERY_DISABLE_RATE_LIMITS :设置:`worker_disable_rate_limits`
CELERY_ENABLE_REMOTE_CONTROL :设置:`worker_enable_remote_control`
CELERYD_SEND_EVENTS :设置:`worker_send_task_events`

您可以在 新小写设置 中查看完整的更改表。


Json 现在是默认的序列化器

终于到了结束 pickle 作为默认序列化机制的统治的时候了,从这个版本开始,json 是默认的序列化器。

随着 Celery 3.1 的发布,此更改 宣布。

如果您仍然依赖 pickle 作为默认序列化程序,那么您必须在升级到 4.0 之前配置您的应用程序:

task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = {'pickle'}

Json 序列化器现在还支持一些其他类型:

  • datetimetimedate

    转换为 json 文本,采用 ISO-8601 格式。

  • Decimal

    转换为 json 文本。

  • django.utils.functional.Promise

    仅限 Django:用于翻译等的惰性字符串被评估并尝试转换为 json 类型。

  • uuid.UUID

    转换为 json 文本。

您还可以在自定义类上定义 __json__ 方法以支持 JSON 序列化(必须返回与 json 兼容的类型):

class Person:
    first_name = None
    last_name = None
    address = None

    def __json__(self):
        return {
            'first_name': self.first_name,
            'last_name': self.last_name,
            'address': self.address,
        }

Task 基类不再自动注册任务

@Task 类不再使用自动在任务注册表中注册任务的特殊元类。

相反,这现在由 @task 装饰器处理。

如果您仍在使用基于类的任务,那么您需要手动注册这些:

class CustomTask(Task):
    def run(self):
        print('running')
CustomTask = app.register_task(CustomTask())

最佳实践是使用自定义任务类仅用于覆盖一般行为,然后使用任务装饰器来实现任务:

@app.task(bind=True, base=CustomTask)
def custom(self):
    print('running')

此更改也意味着任务的 abstract 属性不再有任何影响。


任务参数检查

现在在调用任务时验证任务的参数,即使是异步的:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

您可以通过将其 typing 属性设置为 False 来禁用任何任务的参数检查:

>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

或者,如果您想为所有任务完全禁用此功能,您可以在创建应用程序时传递 strict_typing=False

app = Celery(..., strict_typing=False)

Redis 事件不向后兼容

Redis fanout_patternsfanout_prefix 传输选项现在默认启用。

没有启用这些标志的工作人员/监视器将无法看到禁用此标志的工作人员。 他们仍然可以执行任务,但他们无法接收彼此的监控消息。

在最终升级到 4.0 之前,您可以通过首先配置 3.1 工作器和监视器以启用设置,以向后兼容的方式升级:

BROKER_TRANSPORT_OPTIONS = {
    'fanout_patterns': True,
    'fanout_prefix': True,
}

Redis 优先级颠倒

优先级 0 现在最低,9 最高。

进行此更改是为了使优先支持与其在 AMQP 中的工作方式一致。

亚历克斯·科舍列夫 提供。


Django:自动发现现在支持 Django 应用程序配置

autodiscover_tasks() 函数现在可以不带参数调用,Django 处理程序会自动找到你安装的应用程序:

app.autodiscover_tasks()

文档 中的 Django 集成 示例已更新为使用无参数调用。

这也确保了与最近 Django 版本中引入的新的、ehm、AppConfig 东西的兼容性。


工作人员直接队列不再使用自动删除

运行 4.0 的工作人员/客户端将不再能够向运行旧版本的工作人员发送工作人员直接消息,反之亦然。

如果您依赖于工作人员直接消息,您应该首先升级您的 3.x 工作人员和客户端以使用新的路由设置,方法是将 celery.utils.worker_direct() 替换为以下实现:

from kombu import Exchange, Queue

worker_direct_exchange = Exchange('C.dq2')

def worker_direct(hostname):
    return Queue(
        '{hostname}.dq2'.format(hostname),
        exchange=worker_direct_exchange,
        routing_key=hostname,
    )

此功能关闭了问题 #2492。


删除旧的命令行程序

安装 Celery 将不再安装 celerydcelerybeatceleryd-multi 程序。

这是随着 Celery 3.1 的发布而宣布的,但您可能仍然有指向旧名称的脚本,因此请确保更新它们以使用新的伞命令:

程序 新状态 更换
celeryd 已移除 芹菜工人
celerybeat 已移除 芹菜节拍
celeryd-multi 已移除 芹菜多


新闻

新协议亮点

新协议修复了旧协议的许多问题,并启用了一些期待已久的功能:

  • 大多数数据现在作为消息头发送,而不是与消息正文一起序列化。

    在协议的版本 1 中,worker 总是必须反序列化消息才能读取任务元数据,如任务 ID、名称等。 这也意味着工作人员被迫对数据进行双重解码,首先反序列化接收到的消息,再次序列化消息以发送到子进程,最后子进程再次反序列化消息。

    将元数据字段保留在消息头中意味着工作人员实际上不必在将任务交付给子进程之前解码有效负载,而且工作人员现在可以重新路由用不同于以下语言的语言编写的任务Python 给不同的工人。

  • 新的 lang 消息头可用于指定编写任务的编程语言。

  • Worker 存储内部错误(如 ContentDisallowed)和其他反序列化错误的结果。

  • Worker 存储结果并发送未注册任务错误的监控事件。

  • 即使结果是由父进程发送的,Worker 也会调用回调/errbacks(例如,当子进程终止时,WorkerLostError,反序列化错误,未注册的任务)。

  • 新的 origin 标头包含有关发送任务的进程的信息(工作节点名称,或 PID 和主机名信息)。

  • 新的 shadow 标头允许您修改日志中使用的任务名称。

    这对于类似模式的调度很有用,比如使用 pickle 调用任何函数的任务(不要在家里这样做):

    from celery import Task
    from celery.utils.imports import qualname
    
    class call_as_task(Task):
    
        def shadow_name(self, args, kwargs, options):
            return 'call_as_task:{0}'.format(qualname(args[0]))
    
        def run(self, fun, *args, **kwargs):
            return fun(*args, **kwargs)
    call_as_task = app.register_task(call_as_task())
  • 新的 argsreprkwargsrepr 字段包含用于日志、监视器等的任务参数的文本表示(可能被截断)。

    这意味着工作人员不必反序列化消息有效负载以显示任务参数以供参考。

  • 链现在使用专用的 chain 字段,支持数千个和更多任务的链。

  • 新的 parent_idroot_id 标头添加了有关与其他任务的任务关系的信息。

    • parent_id 是调用这个任务的任务的任务id

    • root_id 是工作流中的第一个任务。

    这些字段可用于改进像花这样的监视器,以将相关消息分组在一起(如链、组、和弦、完整的工作流程等)。

  • app.TaskProducer 替换为 @amqp.create_task_message()@amqp.send_task_message()

    将职责划分为创建和发送意味着想要直接使用 Python AMQP 客户端发送消息的人不必实现该协议。

    @amqp.create_task_message() 方法根据配置的任务协议调用 @amqp.as_task_v2()@amqp.as_task_v1(),并返回一个特殊的 task_message 元组,其中包含任务消息。

也可以看看

新的任务协议在这里有完整的文档:Version 2


Prefork 池改进

任务现在从子进程记录

任务成功/失败的记录现在发生在执行任务的子进程中。 因此,日志实用程序(如 Sentry)可以获得有关任务的完整信息,包括回溯堆栈中的变量。


-Ofair 现在是默认调度策略

要重新启用 3.1 中的默认行为,请使用 -Ofast 命令行选项。

关于 -Ofair 命令行选项的作用存在很多混淆,考虑到这个术语在 AMQP 中的混淆程度,在解释中使用术语“预取”可能无济于事。

当使用 prefork 池的 Celery Worker 收到任务时,它需要将该任务委托给子进程执行。

prefork 池具有可配置数量的子进程(--concurrency)可用于执行任务,每个子进程使用管道/套接字与父进程通信:

  • inqueue (pipe/socket):父进程向子进程发送任务
  • outqueue(管道/套接字):子进程将结果/返回值发送给父进程。

在 Celery 3.1 中,默认的调度机制只是将任务发送到第一个可写的 inqueue,并使用一些启发式方法来确保我们在它们之间循环以确保每个子进程都会收到相同数量的任务.

这意味着在默认调度策略中,worker 可能会将任务发送到已经在执行任务的同一个子进程。 如果那个任务长时间运行,它可能会阻塞等待的任务很长时间。 更糟糕的是,即使有空闲的子进程可以工作,数百个短期运行的任务也可能被困在一个长时间运行的任务后面。

添加了 -Ofair 调度策略以避免这种情况,并且在启用时它添加了不应向已经执行任务的子进程发送任何任务的规则。

如果您只有短期运行的任务,公平调度策略的性能可能会稍差一些。


限制子进程常驻内存大小

您现在可以通过设置 worker --max-memory-per-child 选项或 :setting:`worker_max_memory_per_child` 设置来限制每个 prefork 池子进程分配的最大内存量。

该限制适用于 RSS/常驻内存大小,以千字节为单位指定。

在当前执行的任务返回后,超过限制的子进程将被终止并替换为新进程。

有关详细信息,请参阅 每个子项的最大内存设置

戴夫·史密斯 提供。


每个子进程一个日志文件

Init-scrips 和 celery multi 现在使用 %I 日志文件格式选项(例如,/var/log/celery/%n%I.log)。

此更改是必要的,以确保在将任务日志记录移动到子进程后,每个子进程都有一个单独的日志文件,因为多个进程写入同一个日志文件可能会导致损坏。

我们鼓励您升级 init-scripts 和 celery multi 参数以使用这个新选项。


运输

RabbitMQ 优先队列支持

有关更多信息,请参阅 RabbitMQ 消息优先级

Gerald Manipon 提供。


分别为读/写配置代理 URL

添加了新的 :setting:`broker_read_url`:setting:`broker_write_url` 设置,以便可以为用于消费/发布的连接提供单独的代理 URL。

除了配置选项之外,应用程序 API 还添加了两个新方法:

  • app.connection_for_read()
  • app.connection_for_write()


现在应该使用这些来代替 app.connection() 来指定所需连接的意图。

笔记

有两个连接池可用:app.pool(读)和 app.producer_pool(写)。 后者实际上并不提供连接,而是提供完整的 kombu.Producer 实例。

def publish_some_message(app, producer=None):
    with app.producer_or_acquire(producer) as producer:
        ...

def consume_messages(app, connection=None):
    with app.connection_or_acquire(connection) as connection:
        ...

RabbitMQ 队列扩展支持

队列声明现在可以通过使用 message_ttlexpires 参数直接设置消息 TTL 和队列到期时间

Queue 中添加了新参数,可让您直接方便地在队列声明中配置 RabbitMQ 队列扩展:

  • Queue(expires=20.0)

    以浮点秒为单位设置队列到期时间。

    参见 kombu.Queue.expires

  • Queue(message_ttl=30.0)

    设置队列消息生存时间浮点秒数。

    参见 kombu.Queue.message_ttl

  • Queue(max_length=1000)

    将队列最大长度(消息数)设置为 int。

    参见 kombu.Queue.max_length

  • Queue(max_length_bytes=1000)

    将队列最大长度(以字节为单位的消息大小总计)设置为 int。

    参见 kombu.Queue.max_length_bytes

  • Queue(max_priority=10)

    将 queue 声明为优先级队列,根据消息的 priority 字段路由消息。

    参见 kombu.Queue.max_priority


现在正式支持 Amazon SQS 传输

SQS 代理传输已被重写为使用异步 I/O,因此加入 RabbitMQ、Redis 和 QPid 作为官方支持的传输。

新实现还利用了长轮询,并解决了与使用 SQS 作为代理相关的几个问题。

这项工作由 Nextdoor 赞助。


现在正式支持 Apache QPid 传输

布赖恩·鲍特斯 提供。


Redis:支持哨兵

您可以将连接指向哨兵 URL 列表,例如:

sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

其中每个哨兵由 ; 分隔。 多个哨兵由 kombu.Connection 构造函数处理,并放置在连接失败的备用服务器列表中。

Sergey AzovskovLorenzo Mancini 提供。


任务

任务自动重试装饰器

为异常事件编写自定义重试处理是如此普遍,以至于我们现在已经内置了对它的支持。

为此,任务装饰器现在支持新的 autoretry_for 参数,您可以在其中指定一组异常以自动重试:

from twitter.exceptions import FailWhaleError

@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

有关更多信息,请参阅 已知异常的自动重试

德米特里·马利诺夫斯基 提供。


Task.replace 改进

  • self.replace(signature)现在可以替换任何任务、和弦或组,替换的签名可以是和弦、组或任何其他类型的签名。

  • 不再继承现有任务的回调和错误反馈。

    如果替换树中的节点,则不会期望新节点继承旧节点的子节点。

  • Task.replace_in_chord 已被移除,使用 .replace 代替。

  • 如果替换的是一个组,该组将自动转换为一个和弦,其中回调“累积”组任务的结果。

    一个新的内置任务(为此添加了 celery.accumulate)

斯蒂夫·莫林询问 Solem 提供。


远程任务回溯

新的 :setting:`task_remote_tracebacks` 将通过注入远程工作者的堆栈使任务回溯更有用。

此功能需要额外的 :pypi:`tblib` 库。

Ionel Cristian Mărieș 提供。


处理任务连接错误

发送任务时发生的连接相关错误现在重新引发为 kombu.exceptions.OperationalError 错误:

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     print('Could not send task %r: %r' % (add, exc))

有关详细信息,请参阅 连接错误处理


Gevent/Eventlet:消费结果的专用线程

当使用 :pypi:`gevent`:pypi:`eventlet` 时,现在有一个线程负责消费事件。

这意味着如果您有许多调用检索结果,则会有一个专用线程来使用它们:

result = add.delay(2, 2)

# this call will delegate to the result consumer thread:
#   once the consumer thread has received the result this greenlet can
# continue.
value = result.get(timeout=3)

这使得在使用 gevent/eventlet 时执行 RPC 调用的性能要好得多。


AsyncResult.then(on_success, on_error)

AsyncResult API 已扩展为支持 promise 协议。

这目前仅适用于 RPC (amqp) 和 Redis 结果后端,但允许您在任务完成时附加回调:

import gevent.monkey
monkey.patch_all()

import time
from celery import Celery

app = Celery(broker='amqp://', backend='rpc')

@app.task
def add(x, y):
    return x + y

def on_result_ready(result):
    print('Received result for id %r: %r' % (result.id, result.result,))

add.delay(2, 2).then(on_result_ready)

time.sleep(3)  # run gevent event loop for a while.

此处使用 :pypi:`gevent` 进行演示,但实际上这是一个 API,在基于回调的事件循环中更有用,例如 :pypi:`twisted`: pypi:`龙卷风`


新的任务路由器 API

:setting:`task_routes` 设置现在可以保存函数,地图路由现在支持 glob 模式和正则表达式。

您现在可以简单地定义一个函数,而不是使用路由器类:

def route_for_task(name, args, kwargs, options, task=None, **kwargs):
    from proj import tasks

    if name == tasks.add.name:
        return {'queue': 'hipri'}

如果您不需要参数,您可以使用 start 参数,只需确保您始终也接受星号参数,以便我们将来能够添加更多功能:

def route_for_task(name, *args, **kwargs):
    from proj import tasks
    if name == tasks.add.name:
        return {'queue': 'hipri', 'priority': 9}

options 参数和新的 task 关键字参数都是函数式路由器的新参数,并且可以更轻松地根据执行选项或任务属性编写路由器。

如果使用 @send_task() 按名称调用任务,则不会设置可选的 task 关键字参数。

有关更多示例,包括在路由器中使用 glob/regex,请参阅 :setting:`task_routes`Automatic routing


画布重构

画布/工作流实现已经过大量重构以修复一些长期未解决的问题。

  • 错误回调现在可以采用真正的异常和回溯实例(问题 #2538)。

    >>> add.s(2, 2).on_error(log_error.s()).delay()

    其中 log_error 可以定义为:

    @app.task
    def log_error(request, exc, traceback):
        with open(os.path.join('/var/errors', request.id), 'a') as fh:
            print('--\n\n{0} {1} {2}'.format(
                task_id, exc, traceback), file=fh)

    有关更多示例,请参阅 Canvas:设计工作流

  • chain(a, b, c) 现在与 a | b | c 的作用相同。

    这意味着 chain 可能不再返回 chain 的实例,而是可以优化工作流程,例如 链接在一起的两组成为一组。

  • 现在将组内的组展开为一个组(问题 #1509)。

  • 块/地图/星图任务现在基于目标任务路由

  • 和弦和链现在可以是不可变的。

  • 修复了序列化签名未转换回签名的错误(问题 #2078)

    修复由 罗斯迪恩 贡献。

  • 修复了使用 JSON 序列化时链和组不起作用的问题(问题 #2076)。

    修复由 罗斯迪恩 贡献。

  • 创建和弦不再导致关键字参数“task_id”的多个值(问题 #2225)。

    Aneil Mallavarapu 贡献的修复。

  • 修复了当链包含和弦作为倒数第二个任务时返回错误结果的问题。

    Aneil Mallavarapu 贡献的修复。

  • group(A.s() | group(B.s() | C.s())) 的特例现在有效。

  • 链:修复了当子任务也是链时设置不正确 id 的错误。

  • group | group 现在被扁平化为一个组(问题 #2573)。

  • 修复了 group | task 没有正确升级到和弦的问题(问题 #2922)。

  • 和弦现在可以正确设置 result.parent 链接。

  • chunks/map/starmap 现在基于目标任务路由。

  • Signature.link 现在在参数为标量(不是列表)时有效

    (第 2019 期)。

  • group() 现在可以正确转发关键字参数(问题 #3426)。

    Samuel Giffard 贡献的修复。

  • 一个 chord 头部组只包含一个任务,现在变成了一个简单的链。

  • link 参数传递给 group.apply_async() 现在会引发错误(问题 #3508)。

  • chord | sig 现在附加到和弦回调(问题 #3356)。


周期性任务

用于配置周期性任务的新 API

这个新的 API 使您能够在定义周期性任务时使用签名,从而消除错误输入任务名称的可能性。

新 API 的一个例子是 here


优化的 Beat 实现

celery beat 实现已通过使用堆调度条目针对数百万个周期性任务进行了优化。

Ask SolemAlexander Koshelev 提供。


根据日出、日落、黎明和黄昏安排任务

有关详细信息,请参阅 太阳能计划

Mark Parncutt 提供。


结果后端

RPC 结果后端成熟

先前实验性 RPC 结果后端中的许多错误已得到修复,现在可以考虑用于生产。

贡献者 Ask SolemMorris Tweed


Redis:结果后端优化

result.get() 现在使用 pub/sub 来流式传输任务结果

在使用 Redis 结果后端时调用 result.get() 过去非常昂贵,因为它使用轮询来等待结果可用。 0.5 秒的默认轮询间隔对性能没有帮助,但对于避免自旋循环是必要的。

新的实现是使用 Redis Pub/Sub 机制立即发布和检索结果,大大缩短了任务往返时间。

Yaroslav ZhavoronkovAsk Solem 提供。


新优化的和弦连接实现

这是 Celery 3.1 中引入的实验性功能,只能通过将 ?new_join=1 添加到结果后端 URL 配置中来启用。

我们认为该实现已经过彻底的测试,可以被认为是稳定的并且默认启用。

新的实现大大减少了和弦的开销,尤其是对于更大的和弦,性能优势可能是巨大的。


引入了新的 Riak 结果后端

有关更多信息,请参阅 conf-riak-result-backend

Gilles DartiguelongueAlman OneNoKriK 提供。


引入了新的 CouchDB 结果后端

有关更多信息,请参阅 CouchDB 后端设置

内森·范·吉姆 提供。


引入了新的 Consul 结果后端

使用 Consul 的键/值存储添加对 Consul 作为后端的支持。

Consul 有一个 HTTP API,您可以通过它存储键及其值。

后端扩展了 KeyValueStoreBackend 并实现了大部分方法。

主要是设置、获取和删除对象。

这允许 Celery 将 Task 结果存储在 Consul 的 K/V 存储中。

Consul 还允许使用来自 Consul 的会话在密钥上设置 TTL。 这样,后端支持任务结果的自动过期。

有关领事的更多信息,请访问 https://consul.io/

后端使用 :pypi:`python-consul` 与 HTTP API 通信。 这个包完全符合 Python 3,就像这个后端一样:

$ pip install python-consul

这将安装所需的包,以便从 Python 与 Consul 的 HTTP API 通信。

您还可以将 consul 指定为对 Celery 的依赖的扩展:

$ pip install celery[consul]

有关详细信息,请参阅 捆绑包

Wido den Hollander 提供。


全新的 Cassandra 结果后端

使用新的 :pypi:`cassandra-driver` 库的全新 Cassandra 后端正在使用旧的 :pypi:`pycassa` 库替换旧的结果后端。

有关更多信息,请参阅 Cassandra 后端设置

要依赖 Celery 和 Cassandra 作为结果后端,请使用:

$ pip install celery[cassandra]

您还可以组合多个扩展需求,更多信息请参见 Bundles


引入了新的 Elasticsearch 结果后端

有关更多信息,请参阅 Elasticsearch 后端设置

要依赖 Celery 和 Elasticsearch 作为结果,请使用:

$ pip install celery[elasticsearch]

您还可以组合多个扩展需求,更多信息请参见 Bundles

艾哈迈德·德米尔 提供。


引入了新的文件系统结果后端

有关更多信息,请参阅 文件系统后端设置

Môshe van der Sterre 提供。


事件批处理

事件现在缓存在工作程序中并作为列表发送,减少了发送监控事件所需的开销。

对于自定义事件监视器的作者,只要您使用 Python Celery 助手 (Receiver) 来实现您的监视器,就不需要任何操作。

但是,如果您正在解析原始事件消息,您现在必须考虑批处理事件消息,因为它们在以下方面与普通事件消息不同:

  • 一批事件消息的路由键将设置为 <event-group>.multi,其中当前唯一的批处理事件组是 task(给出路由键 task.multi)。
  • 消息正文将是一个序列化的字典列表而不是字典。 列表中的每一项都可以看作是一个普通的事件消息体。


在其他新闻…

要求


任务

  • “anon-exchange”现在用于简单的名称-名称直接路由。

    这提高了性能,因为它完全绕过了路由表,此外它还提高了 Redis 代理传输的可靠性。

  • 空的 ResultSet 现在评估为 True。

    Colin McIntosh 贡献的修复。

  • 默认路由密钥 (:setting:`task_default_routing_key`) 和交换名称 (:setting:`task_default_exchange`) 现在取自 :setting:`task_default_queue`[ X188X] 设置。

    这意味着要更改默认队列的名称,您现在只需设置一个设置。

  • 新的 :setting:`task_reject_on_worker_lost` 设置和 reject_on_worker_lost 任务属性决定了执行延迟 ack 任务的子工作进程终止时会发生什么。

    迈克尔·佩马纳 提供。

  • Task.subtask 以别名重命名为 Task.signature

  • Task.subtask_from_request 以别名重命名为 Task.signature_from_request

  • kombu.Queuedelivery_mode 属性现在受到尊重(问题 #1953)。

  • :setting:`task-routes` 中的路由现在可以直接指定一个 Queue 实例。

    示例:

    task_routes = {'proj.tasks.add': {'queue': Queue('add')}}
  • AsyncResult 现在在 task_id 为 None 时引发 ValueError。 (问题#1996)。

  • 重试任务未转发过期设置(问题 #3297)。

  • result.get() 现在支持 on_message 参数来设置为收到的每条消息调用的回调。

  • 添加了新的抽象类:

    • CallableTask

      看起来像是一个任务。

    • CallableSignature

      看起来像一个任务签名。


  • Task.replace 现在可以正确转发回调(问题 #2722)。

    Nicolas Unravel 贡献的修复。

  • Task.replace:附加到链/和弦(关闭 #3232)

    修复了问题 #3232,将签名添加到链中(如果有的话)。 如果给定的签名包含一个,则修复了和弦抑制。

    :github_user:`honux` 贡献的修复。

  • 任务重试现在也会进入急切模式。

    Feanil Patel 贡献的修复。


节拍

  • 修复了具有无效日期的 crontab 无限循环。

    当事件永远无法到达时(例如,4 月 31 日),尝试到达下一个事件将触发无限循环。

    尝试通过在 2,000 次迭代后提高 RuntimeError 来解决这个问题

    (过程中还加了crontab闰年的测试)

    修复由 Romuald Brunet 贡献。

  • 现在确保程序在异常终止服务时以非零退出代码退出。

    Simon Peeters 提供的修复。


应用程序

  • 即使 :setting:`enable_utc` 被禁用(问题 #943),日期现在总是时区感知。

    Omer Katz 贡献的修复。

  • Config:应用程序预配置现在也与配置一起腌制。

    Jeremy Zafran 贡献的修复。

  • 应用程序现在可以更改任务名称的生成方式

    @gen_task_name() 方法。

    德米特里·马利诺夫斯基 提供。

  • 应用程序具有新的 app.current_worker_task 属性,可返回当前正在处理的任务(或 None)。 (问题#2100)。


日志记录

  • get_task_logger() 现在在尝试使用名称“celery”或“celery.task”时会引发异常(问题 #3475)。


执行池

  • Eventlet/Gevent:现在启用 AMQP 心跳(问题 #3338)。

  • Eventlet/Gevent:修复了导致“同时读取”错误的竞争条件(问题 #2755)。

  • Prefork:Prefork 池现在使用 poll 而不是可用的 select(问题 #2373)。

  • Prefork:修复了矿池拒绝关闭工作线程的错误(问题 #2606)。

  • Eventlet:现在在 celery inspect stats 命令中返回池大小。

    Alexander Oblovatniy 提供。


测试

运输

  • amqps:// 现在可以指定为需要 SSL。

  • Redis Transport:Redis 传输现在支持 :setting:`broker_use_ssl` 选项。

    罗伯特·科尔巴 提供。

  • JSON 序列化程序现在为不受支持的类型调用 obj.__json__

    这意味着您现在可以为自定义类型定义 __json__ 方法,该方法可以简化为内置的 json 类型。

    示例:

    class Person:
        first_name = None
        last_name = None
        address = None
    
        def __json__(self):
            return {
                'first_name': self.first_name,
                'last_name': self.last_name,
                'address': self.address,
            }
  • JSON 序列化程序现在处理日期时间、Django 承诺、UUID 和十进制。

  • 新的 Queue.consumer_arguments 可用于通过 x-priority 设置消费者优先级的能力。

    见https://www.rabbitmq.com/consumer-priority.html

    示例:

    consumer = Consumer(channel, consumer_arguments={'x-priority': 3})
  • 队列/交换:添加了 no_declare 选项(也为内部 amq. 交流)。


节目

  • Celery 现在使用 argparse,而不是 optparse

  • 如果控制终端不是 TTY,所有程序现在都禁用颜色。

  • celery worker-q 参数现在禁用启动横幅。

  • celery worker:现在使用严重性信息而不是警告记录“worker ready”消息。

  • celery multi: %n 格式现在是 %N 的同义词,以与 celery worker 保持一致。

  • celery inspect/celery control:现在支持一个新的 --json 选项以提供 json 格式的输出。

  • 芹菜检查注册:现在忽略内置任务。

  • celery purge 现在采用 -Q-X 选项来指定要在清除中包含和排除的队列。

  • 新的 celery logtool:过滤和解析 celery 工作日志文件的实用程序

  • celery multi:现在通过 %i 和 %I 日志文件格式。

  • 常规:%p 现在可用于在日志文件/pid 文件参数中扩展到完整的工作节点名称。

  • 一个新的命令行选项

    --executable 现在可用于守护程序(celery workercelery beat)。

    贡献者 Bert Vanderbauwhede

  • celery worker:支持新的 --prefetch-multiplier 选项。

    Mickaël Penhard 提供。

  • 即使设置了 app 参数,--loader 参数现在也始终有效(问题 #3405)。

  • 检查/控制现在从注册表中获取命令

    这意味着也可以从命令行使用用户远程控制命令。

    请注意,您需要为要在命令行上正确传递的参数指定参数/和参数类型。

    现在有两个装饰器,它们的使用取决于命令的类型:@inspect_command + @control_command:

    from celery.worker.control import control_command
    
    @control_command(
        args=[('n', int)]
        signature='[N=1]',
    )
    def something(state, n=1, **kwargs):
        ...

    这里 args 是命令支持的参数列表。 该列表必须包含 (argument_name, type) 的元组。

    signature 只是在 eg 中使用的命令行帮助 celery -A proj control --help

    命令还支持 variadic 参数,这意味着任何剩余的参数都将添加到单个变量中。 这里由 terminate 命令演示,该命令采用信号参数和可变数量的 task_id:

    from celery.worker.control import control_command
    
    @control_command(
        args=[('signal', str)],
        signature='<signal> [id1, [id2, [..., [idN]]]]',
        variadic='ids',
    )
    def terminate(state, signal, ids, **kwargs):
        ...

    现在可以使用以下命令调用此命令:

    $ celery -A proj control terminate SIGKILL id1 id2 id3`

    有关详细信息,请参阅 编写自己的远程控制命令


工人

  • LimitedSet 的改进和修复。

    摆脱内存泄漏+增加minlen集合大小:运行一段时间后集合的最小剩余大小。 minlen 项目被保留,即使它们应该已经过期。

    更旧甚至更旧的代码的问题:

    1. 在某些情况下,堆会趋于增长(例如多次添加项目)。

    2. 快速添加许多项目不会很快清理它们(如果有的话)。

    3. 与其他工作人员交谈时,已发送 revoked._data,但在另一端将其处理为可迭代的。 这意味着为这些密钥提供新的(当前)时间戳。 通过这样做,工人可以永远回收物品。 结合 1) 和 2),这意味着在大量工作人员中,您很快就会耗尽内存。

    所有这些问题现在都应该得到解决。

    这应该可以解决问题 #3095、#3086。

    David Pravec 提供。

  • 控制远程控制命令队列的新设置。

    艾伦·贾斯蒂诺 提供。

  • :signal:`worker_shutdown` 信号现在总是在关闭期间调用。

    以前,如果工作实例首先由 gc 收集,则不会调用它。

  • Worker 现在仅在使用的代理传输实际支持它们时才启动远程控制命令使用者。

  • Gossip 现在将事件队列的 x-message-ttl 设置为 heartbeat_interval 。 (问题#2005)。

  • 现在保留退出代码(问题 #2024)。

  • 现在拒绝带有无效 ETA 值的消息(而不是 ack,这意味着如果配置了它们,它们将被发送到死信交换)。

  • 修复了使用 -purge 参数时崩溃的问题。

  • 不可恢复错误的日志级别从 error 更改为 critical

  • 提高速率限制精度。

  • 说明任务过期字段中缺少时区信息。

    Albert Wang 提供的修复。

  • 工人不再有 Queues 引导步骤,就像现在一样

    多余的。

  • 现在即使对于已撤销的任务也会发出“已接收任务”行。 (问题#3155)。

  • 现在尊重 :setting:`broker_connection_retry` 设置。

    Nat Williams 贡献的修复。

  • 新的 :setting:`control_queue_ttl`:setting:`control_queue_expires` 设置现在允许您配置远程控制命令消息 TTL 和队列到期时间。

    艾伦·贾斯蒂诺 提供。

  • 新的 celery.worker.state.requests 通过 id 启用 O(1) 查找活动/保留任务。

  • 缩小时,自动缩放并不总是更新保持活动状态。

    Philip Garnero 贡献的修复。

  • 修正了错字 options_list -> option_list

    Greg Wilbur 贡献的修复。

  • 为保持一致性,一些 worker 命令行参数和 Worker() 类参数已重命名。

    所有这些都有向后兼容的别名。

    • --send-events -> --task-events

    • --schedule -> --schedule-filename

    • --maxtasksperchild -> --max-tasks-per-child

    • Beat(scheduler_cls=) -> Beat(scheduler=)

    • Worker(send_events=True) -> Worker(task_events=True)

    • Worker(task_time_limit=) -> Worker(time_limit=)

    • Worker(task_soft_time_limit=) -> Worker(soft_time_limit=)

    • Worker(state_db=) -> Worker(statedb=)

    • Worker(working_directory=) -> Worker(workdir=)



调试工具

  • celery.contrib.rdb:更改远程调试器横幅,以便您可以轻松复制和粘贴地址(地址中不再有句点)。

    乔纳森·瓦纳斯科 提供。

  • 修复了与最近的 :pypi:`psutil` 版本的兼容性(问题 #3262)。


信号

  • App:应用配置/完成的新信号:

    • app.on_configure

    • app.on_after_configure

    • app.on_after_finalize


  • Task:拒绝任务消息的新任务信号:

    • celery.signals.task_rejected

    • celery.signals.task_unknown


  • Worker:发送心跳事件时的新信号。

    • celery.signals.heartbeat_sent

      凯文·理查森 提供。



活动

  • 事件消息现在使用 RabbitMQ x-message-ttl 选项来确保旧的事件消息被丢弃。

    默认为 5 秒,但可以使用 :setting:`event_queue_ttl` 设置进行更改。

  • Task.send_event 现在根据任务发布重试设置,在连接失败时自动重试发送事件。

  • 事件监视器现在默认设置 :setting:`event_queue_expires` 设置。

    队列现在将在监视器停止使用后 60 秒后过期。

  • 修复了 None 值未正确处理的错误。

    Dongweiming 贡献的修复。

  • 新的 :setting:`event_queue_prefix` 设置现在可用于更改事件接收器队列的默认 celeryev 队列前缀。

    金本武 提供。

  • State.tasks_by_typeState.tasks_by_worker 现在可以用作快速访问此信息的映射。


部署

  • 通用初始化脚本现在支持 [X33X]CELERY_SUCELERYD_SU_ARGS 环境变量来设置 su ( ]su(1))。

  • 通用初始化脚本现在通过搜索 /usr/local/etc/ 的配置文件更好地支持 FreeBSD 和其他 BSD 系统。

    塔哈·贾汉吉尔 提供。

  • 通用初始化脚本:修复了 celerybeat 的奇怪错误,其中重启并不总是有效(问题 #3018)。

  • systemd init 脚本现在在执行服务时使用 shell。

    托马斯·马查莱克 提供。


结果后端

  • Redis:现在默认套接字超时为 120 秒。

    可以使用新的 :setting:`redis_socket_timeout` 设置更改默认值。

    Raghuram Srinivasan 提供。

  • RPC 后端结果队列现在默认自动删除(问题 #2001)。

  • RPC 后端:修复了使用 json 序列化程序未正确反序列化异常的问题(问题 #2518)。

    Allard Hoeve 贡献的修复。

  • CouchDB:用于对结果进行双重 json 编码的后端。

    Andrew Stewart 贡献的修复。

  • CouchDB:修复了导致找不到后端的错字(问题 #3287)。

    Andrew Stewart 贡献的修复。

  • MongoDB:现在支持将 :setting:`result_serialzier` 设置设置为 bson 以使用 MongoDB 库自己的序列化程序。

    Davide Quarta 提供。

  • MongoDB:已改进 URI 处理以供使用

    URI 中的数据库名称、用户和密码(如果提供)。

    塞缪尔·贾耶特 提供。

  • SQLAlchemy 结果后端:现在在使用 NullPool 时忽略所有结果引擎选项(问题 #1930)。

  • SQLAlchemy 结果后端:现在将最大字符大小设置为 155 以处理大脑损坏的 MySQL Unicode 实现(问题 #1748)。

  • General:所有 Celery 异常/警告现在都继承自常见的 CeleryError/CeleryWarning。 (问题#2643)。


文档改进

贡献者:

  • 亚当·查恩兹
  • 阿米尔·鲁斯塔姆扎德
  • 亚瑟·维亚尔
  • 巴蒂斯特·比勒
  • 伯克·佩克萨格
  • 布莱斯·格罗夫
  • 丹尼尔·迪瓦恩
  • 爱德华·贝茨
  • 杰森威奇
  • 杰夫·威德曼
  • 马切伊·奥布乔夫斯基
  • 曼努埃尔·考夫曼
  • 马克西姆·博彻明
  • 米切尔·汉弗莱斯
  • 帕夫洛·卡皮申
  • 皮埃尔·费辛
  • 里克
  • 史蒂文·斯克拉
  • 泰芬森
  • 维兰德霍夫曼


重组、弃用和删除

不兼容的更改

  • Prefork:调用 result.get() 或从任务中加入任何结果现在会引发 RuntimeError

    在以前的版本中,这会发出警告。

  • celery.worker.consumer 现在是一个包,而不是一个模块。

  • 模块 celery.worker.job 重命名为 celery.worker.request

  • 节拍:Scheduler.Publisher/.publisher 重命名为 .Producer/.producer

  • 结果:删除了 @AsyncResult 的 task_name 参数/属性。

    这是历史上用于 pickle 兼容性的字段,但不再需要。

  • 后端:名为 status 的参数重命名为 state

  • 后端:backend.get_status() 重命名为 backend.get_state()

  • 后端:backend.maybe_reraise() 重命名为 .maybe_throw()

    promise API 使用 .throw(),因此进行了此更改以使其更加一致。

    有一个别名可用,所以在 Celery 5.0 之前你仍然可以使用 may_reraise。


计划外搬迁

  • 实验性的 celery.contrib.methods 功能已被删除,因为在实现中有很多有用的错误。

  • CentOS 初始化脚本已被删除。

    这些并没有真正在通用 init 脚本上添加任何功能,因此鼓励您改用它们,或者像 :pypi:`supervisor` 之类的东西。


重组弃用

这些符号已被重命名,虽然此版本中有一个别名可用于向后兼容,但它们将在 Celery 5.0 中删除,因此请确保尽快重命名这些符号以确保它不会因该版本而中断。

您可能只会使用此列表中的第一个,但您永远不会知道:

  • celery.utils.worker_direct -> celery.utils.nodenames.worker_direct()
  • celery.utils.nodename -> celery.utils.nodenames.nodename()
  • celery.utils.anon_nodename -> celery.utils.nodenames.anon_nodename()
  • celery.utils.nodesplit -> celery.utils.nodenames.nodesplit()
  • celery.utils.default_nodename -> celery.utils.nodenames.default_nodename()
  • celery.utils.node_format -> celery.utils.nodenames.node_format()
  • celery.utils.host_format -> celery.utils.nodenames.host_format()


预定搬迁

模块

  • 模块 celery.worker.job 已重命名为 celery.worker.request

    这是一个内部模块,所以不应该有任何影响。 它现在是公共 API 的一部分,因此不得再次更改。

  • 模块 celery.task.trace 已重命名为 celery.app.trace,因为 celery.task 包正在逐步淘汰。 该模块将在 5.0 版中删除,因此请更改任何导入:

    from celery.task.trace import X

    到:

    from celery.app.trace import X
  • celery.loaders 模块中旧的兼容性别名已被删除。

    • 移除 celery.loaders.current_loader(),使用:current_app.loader

    • 移除 celery.loaders.load_settings(),使用:current_app.conf



结果

  • AsyncResult.serializable()celery.result.from_serializable

    已被删除:

    改用:

    >>> tup = result.as_tuple()
    >>> from celery.result import result_from_tuple
    >>> result = result_from_tuple(tup)
  • 移除了 BaseAsyncResult,改为使用 AsyncResult 进行实例检查。

  • 移除 TaskSetResult,改为使用 GroupResult

    • TaskSetResult.total -> len(GroupResult)

    • TaskSetResult.taskset_id -> GroupResult.id


  • 移除 ResultSet.subtasks,改为使用 ResultSet.results


任务集

TaskSet 已被删除,因为它被 Celery 3.0 中的 group 结构取代。

如果你有这样的代码:

>>> from celery.task import TaskSet

>>> TaskSet(add.subtask((i, i)) for i in xrange(10)).apply_async()

您需要将其替换为:

>>> from celery import group
>>> group(add.s(i, i) for i in xrange(10))()

活动

  • celery.events.state.Worker 类的移除:

    • Worker._defaults 属性。

      使用 {k: getattr(worker, k) for k in worker._fields}

    • Worker.update_heartbeat

      使用 Worker.event(None, timestamp, received)

    • Worker.on_online

      使用 Worker.event('online', timestamp, received, fields)

    • Worker.on_offline

      使用 Worker.event('offline', timestamp, received, fields)

    • Worker.on_heartbeat

      使用 Worker.event('heartbeat', timestamp, received, fields)


  • celery.events.state.Task 类的移除:

    • Task._defaults 属性。

      使用 {k: getattr(task, k) for k in task._fields}

    • Task.on_sent

      使用 Worker.event('sent', timestamp, received, fields)

    • Task.on_received

      使用 Task.event('received', timestamp, received, fields)

    • Task.on_started

      使用 Task.event('started', timestamp, received, fields)

    • Task.on_failed

      使用 Task.event('failed', timestamp, received, fields)

    • Task.on_retried

      使用 Task.event('retried', timestamp, received, fields)

    • Task.on_succeeded

      使用 Task.event('succeeded', timestamp, received, fields)

    • Task.on_revoked

      使用 Task.event('revoked', timestamp, received, fields)

    • Task.on_unknown_event

      使用 Task.event(short_type, timestamp, received, fields)

    • Task.update

      使用 Task.event(short_type, timestamp, received, fields)

    • Task.merge

      如果您需要这个,请联系我们。



魔术关键字参数

在此版本中最终删除了对任务接受的非常古老的魔法关键字参数的支持。

如果你仍在使用这些你必须重写任何仍然使用旧的 celery.decorators 模块的任务,并且取决于传递给任务的关键字参数,例如:

from celery.decorators import task

@task()
def add(x, y, task_id=None):
    print('My task id is %r' % (task_id,))

应该改写为:

from celery import task

@task(bind=True)
def add(self, x, y):
    print('My task id is {0.request.id}'.format(self))

删除设置

以下设置已被删除,不再受支持:

日志设置

设置名称 用。。。来代替
CELERYD_LOG_LEVEL celery worker --loglevel
CELERYD_LOG_FILE celery worker --logfile
CELERYBEAT_LOG_LEVEL celery beat --loglevel
CELERYBEAT_LOG_FILE celery beat --logfile
CELERYMON_LOG_LEVEL celerymon 已弃用,请使用花
CELERYMON_LOG_FILE celerymon 已弃用,请使用花
CELERYMON_LOG_FORMAT celerymon 已弃用,请使用花


任务设置

设置名称 用。。。来代替
CELERY_CHORD_PROPAGATES 不适用


对内部 API 的更改

  • 模块 celery.datastructures 重命名为 celery.utils.collections

  • 模块 celery.utils.timeutils 重命名为 celery.utils.time

  • celery.utils.datastructures.DependencyGraph 移至 celery.utils.graph

  • celery.utils.jsonify 现在是 celery.utils.serialization.jsonify()

  • celery.utils.strtobool 现在是 celery.utils.serialization.strtobool()

  • celery.utils.is_iterable 已被移除。

    而是使用:

    isinstance(x, collections.Iterable)
  • celery.utils.lpmerge 现在是 celery.utils.collections.lpmerge()

  • celery.utils.cry 现在是 celery.utils.debug.cry()

  • celery.utils.isatty 现在是 celery.platforms.isatty()

  • celery.utils.gen_task_name 现在是 celery.utils.imports.gen_task_name()

  • celery.utils.deprecated 现在是 celery.utils.deprecated.Callable()

  • celery.utils.deprecated_property 现在是 celery.utils.deprecated.Property()

  • celery.utils.warn_deprecated 现在是 celery.utils.deprecated.warn()


弃用时间线更改

请参阅 Celery 弃用时间表