应用程序 — Python 文档
应用
Celery 库在使用前必须实例化,这个实例称为应用程序(或简称app)。
该应用程序是线程安全的,因此具有不同配置、组件和任务的多个 Celery 应用程序可以在同一个进程空间中共存。
现在让我们创建一个:
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>
最后一行显示了应用程序的文本表示:包括应用程序类的名称(Celery
)、当前主模块的名称(__main__
)和对象的内存地址(0x100469fd0
)。
主要名称
其中只有一个是重要的,那就是主模块名称。 让我们看看为什么会这样。
当您在 Celery 中发送任务消息时,该消息将不包含任何源代码,而仅包含您要执行的任务的名称。 这与主机名在互联网上的工作方式类似:每个工作人员都维护一个任务名称与其实际功能的映射,称为 任务注册表 。
每当您定义一个任务时,该任务也将被添加到本地注册表中:
>>> @app.task
... def add(x, y):
... return x + y
>>> add
<@task: __main__.add>
>>> add.name
__main__.add
>>> app.tasks['__main__.add']
<@task: __main__.add>
然后你又看到了 __main__
; 每当 Celery 无法检测到该函数属于哪个模块时,它就会使用主模块名称来生成任务名称的开头。
这只是有限用例集的问题:
- 如果定义任务的模块作为程序运行。
- 如果应用程序是在 Python shell (REPL) 中创建的。
例如在这里,tasks 模块也用于通过 @worker_main()
启动一个 worker:
tasks.py
:
from celery import Celery
app = Celery()
@app.task
def add(x, y): return x + y
if __name__ == '__main__':
app.worker_main()
当这个模块被执行时,任务将以“__main__
”开头,但是当模块被另一个进程导入时,比如调用一个任务,任务将以“tasks
”开头]”(模块的真实名称):
>>> from tasks import add
>>> add.name
tasks.add
您可以为主模块指定另一个名称:
>>> app = Celery('tasks')
>>> app.main
'tasks'
>>> @app.task
... def add(x, y):
... return x + y
>>> add.name
tasks.add
配置
您可以设置几个选项来改变 Celery 的工作方式。 这些选项可以直接在应用实例上设置,也可以使用专用的配置模块。
配置为 @conf
:
>>> app.conf.timezone
'Europe/London'
您还可以直接设置配置值:
>>> app.conf.enable_utc = True
或使用 update
方法一次更新多个密钥:
>>> app.conf.update(
... enable_utc=True,
... timezone='Europe/London',
...)
配置对象由多个按顺序查询的字典组成:
- 在运行时所做的更改。
- 配置模块(如果有)
- 默认配置(
celery.app.defaults
)。
您甚至可以使用 @add_defaults()
方法添加新的默认源。
config_from_object
@config_from_object()
方法从配置对象加载配置。
这可以是配置模块,也可以是具有配置属性的任何对象。
请注意,当调用 @config_from_object()
时,之前设置的任何配置都将被重置。 如果你想设置额外的配置,你应该在之后这样做。
示例 1:使用模块的名称
@config_from_object()
方法可以采用 Python 模块的完全限定名称,甚至是 Python 属性的名称,例如:"celeryconfig"
、"myproj.config.celery"
或 [ X157X]:
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
celeryconfig
模块可能如下所示:
celeryconfig.py
:
enable_utc = True
timezone = 'Europe/London'
只要 import celeryconfig
是可能的,应用程序就可以使用它。
示例 2:传递实际模块对象
你也可以传递一个已经导入的模块对象,但并不总是推荐这样做。
提示
建议使用模块的名称,因为这意味着在使用 prefork 池时不需要序列化模块。 如果您遇到配置问题或pickle 错误,请尝试改用模块的名称。
import celeryconfig
from celery import Celery
app = Celery()
app.config_from_object(celeryconfig)
示例 3:使用配置类/对象
from celery import Celery
app = Celery()
class Config:
enable_utc = True
timezone = 'Europe/London'
app.config_from_object(Config)
# or using the fully qualified name of the object:
# app.config_from_object('module:Config')
config_from_envvar
@config_from_envvar()
从环境变量中获取配置模块名称
例如 – 从名为 CELERY_CONFIG_MODULE
的环境变量中指定的模块加载配置:
import os
from celery import Celery
#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')
然后,您可以通过环境指定要使用的配置模块:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO
审查配置
如果您想打印出配置、调试信息或类似信息,您可能还想过滤掉密码和 API 密钥等敏感信息。
Celery 附带了几个用于显示配置的实用程序,一个是 humanize()
:
>>> app.conf.humanize(with_defaults=False, censored=True)
此方法将配置作为列表字符串返回。 默认情况下,这将仅包含对配置的更改,但您可以通过启用 with_defaults
参数来包含内置的默认键和值。
如果您想将配置作为字典使用,则可以使用 table()
方法:
>>> app.conf.table(with_defaults=False, censored=True)
请注意,Celery 无法删除所有敏感信息,因为它仅使用正则表达式来搜索常用名称的键。 如果您添加包含敏感信息的自定义设置,您应该使用 Celery 识别为机密的名称来命名密钥。
如果名称包含以下任何子字符串,则配置设置将被审查:
API
、TOKEN
、KEY
、SECRET
、PASS
、SIGNATURE
、DATABASE
懒惰
应用程序实例是惰性的,这意味着在实际需要它之前不会对其进行评估。
创建 @Celery
实例只会执行以下操作:
- 创建一个逻辑时钟实例,用于事件。
- 创建任务注册表。
- 将自身设置为当前应用程序(但如果
set_as_current
参数被禁用,则不会)- 调用
@on_init()
回调(默认情况下什么都不做)。
@task()
装饰器不会在定义任务时创建任务,而是将任务的创建推迟到使用任务时或应用程序完成后 [X206X ]最终确定,
此示例显示了在您使用任务或访问属性(在本例中为 repr()
)之前如何创建任务:
>>> @app.task
>>> def add(x, y):
... return x + y
>>> type(add)
<class 'celery.local.PromiseProxy'>
>>> add.__evaluated__()
False
>>> add # <-- causes repr(add) to happen
<@task: __main__.add>
>>> add.__evaluated__()
True
应用程序的 Finalization 通过调用 @finalize()
显式发生 - 或通过访问 @tasks
属性隐式发生。
完成对象将:
复制必须在应用程序之间共享的任务
默认情况下,任务是共享的,但如果禁用了任务装饰器的
shared
参数,则该任务将是它绑定到的应用程序的私有任务。评估所有挂起的任务装饰器。
确保所有任务都绑定到当前应用程序。
任务绑定到应用程序,以便它们可以从配置中读取默认值。
“默认应用”
Celery 并不总是有应用程序,过去只有一个基于模块的 API。 在 Celery 5.0 发布之前,旧位置提供了一个兼容性 API,但已被删除。
Celery 总是创建一个特殊的应用程序——“默认应用程序”,如果没有自定义应用程序被实例化,就会使用它。
celery.task
模块不再可用。 使用应用程序实例上的方法,而不是基于模块的 API:
from celery.task import Task # << OLD Task base class.
from celery import Task # << NEW base class.
打破链条
虽然可能依赖于当前设置的应用程序,但最佳实践是始终将应用程序实例传递给任何需要它的对象。
我称之为“应用链”,因为它根据传递的应用程序创建了一个实例链。
以下示例被认为是不好的做法:
from celery import current_app
class Scheduler:
def run(self):
app = current_app
相反,它应该将 app
作为参数:
class Scheduler:
def __init__(self, app):
self.app = app
Celery 在内部使用 celery.app.app_or_default()
函数,因此一切都可以在基于模块的兼容性 API 中运行
from celery.app import app_or_default
class Scheduler:
def __init__(self, app=None):
self.app = app_or_default(app)
在开发中,您可以设置 CELERY_TRACE_APP
环境变量以在应用程序链中断时引发异常:
$ CELERY_TRACE_APP=1 celery worker -l INFO
演进 API
Celery 自 2009 年最初创建以来发生了很大变化。
例如,一开始可以将任何可调用对象用作任务:
def hello(to):
return 'hello {0}'.format(to)
>>> from celery.execute import apply_async
>>> apply_async(hello, ('world!',))
或者您也可以创建一个 Task
类来设置某些选项,或覆盖其他行为
from celery import Task
from celery.registry import tasks
class Hello(Task):
queue = 'hipri'
def run(self, to):
return 'hello {0}'.format(to)
tasks.register(Hello)
>>> Hello.delay('world!')
后来,决定传递任意可调用的是一种反模式,因为它很难使用除 pickle 之外的序列化程序,并且该功能在 2.0 中被删除,取而代之的是任务装饰器:
from celery import app
@app.task(queue='hipri')
def hello(to):
return 'hello {0}'.format(to)
抽象任务
使用 @task()
装饰器创建的所有任务都将从应用程序的基类 @Task
继承。
您可以使用 base
参数指定不同的基类:
@app.task(base=OtherTask):
def add(x, y):
return x + y
要创建自定义任务类,您应该从中性基类继承:celery.Task
。
from celery import Task
class DebugTask(Task):
def __call__(self, *args, **kwargs):
print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
return self.run(*args, **kwargs)
提示
如果你覆盖了任务的 __call__
方法,那么你还调用 self.run
来执行任务的主体是非常重要的。 不要叫super().__call__
。 中性基类celery.Task
的__call__
方法仅供参考。 为了优化,如果没有定义 __call__
方法,这已经展开到 celery.app.trace.build_tracer.trace_task
中,它直接在自定义任务类上调用 run
。
中性基类很特别,因为它还没有绑定到任何特定的应用程序。 一旦任务绑定到应用程序,它将读取配置以设置默认值,依此类推。
要实现基类,您需要使用 @task()
装饰器创建一个任务:
@app.task(base=DebugTask)
def add(x, y):
return x + y
甚至可以通过更改其 @Task()
属性来更改应用程序的默认基类:
>>> from celery import Celery, Task
>>> app = Celery()
>>> class MyBaseTask(Task):
... queue = 'hipri'
>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>
>>> @app.task
... def add(x, y):
... return x + y
>>> add
<@task: __main__.add>
>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
<unbound MyBaseTask>,
<unbound Task>,
<type 'object'>]