应用程序 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/application
跳转至:导航、​搜索

应用

Celery 库在使用前必须实例化,这个实例称为应用程序(或简称app)。

该应用程序是线程安全的,因此具有不同配置、组件和任务的多个 Celery 应用程序可以在同一个进程空间中共存。

现在让我们创建一个:

>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最后一行显示了应用程序的文本表示:包括应用程序类的名称(Celery)、当前主模块的名称(__main__)和对象的内存地址(0x100469fd0)。

主要名称

其中只有一个是重要的,那就是主模块名称。 让我们看看为什么会这样。

当您在 Celery 中发送任务消息时,该消息将不包含任何源代码,而仅包含您要执行的任务的名称。 这与主机名在互联网上的工作方式类似:每个工作人员都维护一个任务名称与其实际功能的映射,称为 任务注册表

每当您定义一个任务时,该任务也将被添加到本地注册表中:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

然后你又看到了 __main__; 每当 Celery 无法检测到该函数属于哪个模块时,它就会使用主模块名称来生成任务名称的开头。

这只是有限用例集的问题:

  1. 如果定义任务的模块作为程序运行。
  2. 如果应用程序是在 Python shell (REPL) 中创建的。


例如在这里,tasks 模块也用于通过 @worker_main() 启动一个 worker:

tasks.py

from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

当这个模块被执行时,任务将以“__main__”开头,但是当模块被另一个进程导入时,比如调用一个任务,任务将以“tasks”开头]”(模块的真实名称):

>>> from tasks import add
>>> add.name
tasks.add

您可以为主模块指定另一个名称:

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

也可以看看

姓名


配置

您可以设置几个选项来改变 Celery 的工作方式。 这些选项可以直接在应用实例上设置,也可以使用专用的配置模块。

配置为 @conf

>>> app.conf.timezone
'Europe/London'

您还可以直接设置配置值:

>>> app.conf.enable_utc = True

或使用 update 方法一次更新多个密钥:

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

配置对象由多个按顺序查询的字典组成:

  1. 在运行时所做的更改。
  2. 配置模块(如果有)
  3. 默认配置(celery.app.defaults)。


您甚至可以使用 @add_defaults() 方法添加新的默认源。

也可以看看

转到 配置参考 以获取所有可用设置及其默认值的完整列表。


config_from_object

@config_from_object() 方法从配置对象加载配置。

这可以是配置模块,也可以是具有配置属性的任何对象。

请注意,当调用 @config_from_object() 时,之前设置的任何配置都将被重置。 如果你想设置额外的配置,你应该在之后这样做。

示例 1:使用模块的名称

@config_from_object() 方法可以采用 Python 模块的完全限定名称,甚至是 Python 属性的名称,例如:"celeryconfig""myproj.config.celery" 或 [ X157X]:

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

celeryconfig 模块可能如下所示:

celeryconfig.py

enable_utc = True
timezone = 'Europe/London'

只要 import celeryconfig 是可能的,应用程序就可以使用它。


示例 2:传递实际模块对象

你也可以传递一个已经导入的模块对象,但并不总是推荐这样做。

提示

建议使用模块的名称,因为这意味着在使用 prefork 池时不需要序列化模块。 如果您遇到配置问题或pickle 错误,请尝试改用模块的名称。


import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

示例 3:使用配置类/对象

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
#   app.config_from_object('module:Config')

config_from_envvar

@config_from_envvar() 从环境变量中获取配置模块名称

例如 – 从名为 CELERY_CONFIG_MODULE 的环境变量中指定的模块加载配置:

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

然后,您可以通过环境指定要使用的配置模块:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO

审查配置

如果您想打印出配置、调试信息或类似信息,您可能还想过滤掉密码和 API 密钥等敏感信息。

Celery 附带了几个用于显示配置的实用程序,一个是 humanize()

>>> app.conf.humanize(with_defaults=False, censored=True)

此方法将配置作为列表字符串返回。 默认情况下,这将仅包含对配置的更改,但您可以通过启用 with_defaults 参数来包含内置的默认键和值。

如果您想将配置作为字典使用,则可以使用 table() 方法:

>>> app.conf.table(with_defaults=False, censored=True)

请注意,Celery 无法删除所有敏感信息,因为它仅使用正则表达式来搜索常用名称的键。 如果您添加包含敏感信息的自定义设置,您应该使用 Celery 识别为机密的名称来命名密钥。

如果名称包含以下任何子字符串,则配置设置将被审查:

APITOKENKEYSECRETPASSSIGNATUREDATABASE


懒惰

应用程序实例是惰性的,这意味着在实际需要它之前不会对其进行评估。

创建 @Celery 实例只会执行以下操作:

  1. 创建一个逻辑时钟实例,用于事件。
  2. 创建任务注册表。
  3. 将自身设置为当前应用程序(但如果 set_as_current 参数被禁用,则不会)
  4. 调用 @on_init() 回调(默认情况下什么都不做)。


@task() 装饰器不会在定义任务时创建任务,而是将任务的创建推迟到使用任务时或应用程序完成后 [X206X ]最终确定,

此示例显示了在您使用任务或访问属性(在本例中为 repr())之前如何创建任务:

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

应用程序的 Finalization 通过调用 @finalize() 显式发生 - 或通过访问 @tasks 属性隐式发生。

完成对象将:

  1. 复制必须在应用程序之间共享的任务

    默认情况下,任务是共享的,但如果禁用了任务装饰器的 shared 参数,则该任务将是它绑定到的应用程序的私有任务。

  2. 评估所有挂起的任务装饰器。

  3. 确保所有任务都绑定到当前应用程序。

    任务绑定到应用程序,以便它们可以从配置中读取默认值。


“默认应用”

Celery 并不总是有应用程序,过去只有一个基于模块的 API。 在 Celery 5.0 发布之前,旧位置提供了一个兼容性 API,但已被删除。

Celery 总是创建一个特殊的应用程序——“默认应用程序”,如果没有自定义应用程序被实例化,就会使用它。

celery.task 模块不再可用。 使用应用程序实例上的方法,而不是基于模块的 API:

from celery.task import Task   # << OLD Task base class.

from celery import Task        # << NEW base class.

打破链条

虽然可能依赖于当前设置的应用程序,但最佳实践是始终将应用程序实例传递给任何需要它的对象。

我称之为“应用链”,因为它根据传递的应用程序创建了一个实例链。

以下示例被认为是不好的做法:

from celery import current_app

class Scheduler:

    def run(self):
        app = current_app

相反,它应该将 app 作为参数:

class Scheduler:

    def __init__(self, app):
        self.app = app

Celery 在内部使用 celery.app.app_or_default() 函数,因此一切都可以在基于模块的兼容性 API 中运行

from celery.app import app_or_default

class Scheduler:
    def __init__(self, app=None):
        self.app = app_or_default(app)

在开发中,您可以设置 CELERY_TRACE_APP 环境变量以在应用程序链中断时引发异常:

$ CELERY_TRACE_APP=1 celery worker -l INFO

演进 API

Celery 自 2009 年最初创建以来发生了很大变化。

例如,一开始可以将任何可调用对象用作任务:

def hello(to):
    return 'hello {0}'.format(to)

>>> from celery.execute import apply_async

>>> apply_async(hello, ('world!',))

或者您也可以创建一个 Task 类来设置某些选项,或覆盖其他行为

from celery import Task
from celery.registry import tasks

class Hello(Task):
    queue = 'hipri'

    def run(self, to):
        return 'hello {0}'.format(to)
tasks.register(Hello)

>>> Hello.delay('world!')

后来,决定传递任意可调用的是一种反模式,因为它很难使用除 pickle 之外的序列化程序,并且该功能在 2.0 中被删除,取而代之的是任务装饰器:

from celery import app

@app.task(queue='hipri')
def hello(to):
    return 'hello {0}'.format(to)

抽象任务

使用 @task() 装饰器创建的所有任务都将从应用程序的基类 @Task 继承。

您可以使用 base 参数指定不同的基类:

@app.task(base=OtherTask):
def add(x, y):
    return x + y

要创建自定义任务类,您应该从中性基类继承:celery.Task

from celery import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return self.run(*args, **kwargs)

提示

如果你覆盖了任务的 __call__ 方法,那么你还调用 self.run 来执行任务的主体是非常重要的。 不要叫super().__call__。 中性基类celery.Task__call__方法仅供参考。 为了优化,如果没有定义 __call__ 方法,这已经展开到 celery.app.trace.build_tracer.trace_task 中,它直接在自定义任务类上调用 run


中性基类很特别,因为它还没有绑定到任何特定的应用程序。 一旦任务绑定到应用程序,它将读取配置以设置默认值,依此类推。

要实现基类,您需要使用 @task() 装饰器创建一个任务:

@app.task(base=DebugTask)
def add(x, y):
    return x + y

甚至可以通过更改其 @Task() 属性来更改应用程序的默认基类:

>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
...    queue = 'hipri'

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
 <unbound MyBaseTask>,
 <unbound Task>,
 <type 'object'>]