Celery 的第一步 — Python 文档

来自菜鸟教程
Celery/docs/latest/getting-started/first-steps-with-celery
跳转至:导航、​搜索

芹菜的第一步

Celery 是一个包含电池的任务队列。 它易于使用,因此您无需了解它所解决问题的全部复杂性即可开始使用。 它围绕最佳实践而设计,以便您的产品可以扩展并与其他语言集成,并且附带了在生产中运行此类系统所需的工具和支持。

在本教程中,您将学习使用 Celery 的绝对基础知识。

了解;

  • 选择并安装消息传输(代理)。
  • 安装 Celery 并创建您的第一个任务。
  • 启动工作程序并调用任务。
  • 在任务过渡到不同状态时跟踪任务,并检查返回值。

Celery 起初可能看起来令人生畏 - 但别担心 - 本教程将让您立即开始。 它故意保持简单,以免让您与高级功能混淆。 完成本教程后,最好浏览一下文档的其余部分。 例如,下一步 教程将展示 Celery 的功能。

选择经纪人

Celery 需要一个解决方案来发送和接收消息; 通常这以称为 消息代理 的单独服务的形式出现。

有多种选择,包括:

兔MQ

RabbitMQ 功能齐全、稳定、耐用且易于安装。 它是生产环境的绝佳选择。 有关在 Celery 中使用 RabbitMQ 的详细信息:

如果您使用的是 Ubuntu 或 Debian,请执行以下命令安装 RabbitMQ:

$ sudo apt-get install rabbitmq-server

或者,如果您想在 Docker 上运行它,请执行以下操作:

$ docker run -d -p 5672:5672 rabbitmq

命令完成后,代理将在后台运行,准备为您移动消息:Starting rabbitmq-server: SUCCESS

如果您运行的不是 Ubuntu 或 Debian,请不要担心,您可以访问此网站以查找其他平台(包括 Microsoft Windows)的类似简单安装说明:

Redis

Redis 也功能齐全,但在突然终止或断电的情况下更容易丢失数据。 关于使用Redis的详细信息:

使用Redis

如果要在 Docker 上运行它,请执行以下操作:

$ docker run -d -p 6379:6379 redis

其他经纪人

除上述之外,还有其他实验性传输实现可供选择,包括 Amazon SQS

有关完整列表,请参阅 经纪商概览


安装芹菜

Celery 位于 Python Package Index (PyPI) 上,因此可以使用标准 Python 工具(如 pipeasy_install)安装它:

$ pip install celery

应用

你需要的第一件事是一个 Celery 实例。 我们将其称为 Celery 应用程序 或简称为 app。 因为这个实例被用作你想在 Celery 中做的所有事情的入口点,比如创建任务和管理工作人员,其他模块必须可以导入它。

在本教程中,我们将所有内容都包含在单个模块中,但对于较大的项目,您希望创建一个 专用模块

让我们创建文件 tasks.py

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

Celery 的第一个参数是当前模块的名称。 这只是为了在 __main__ 模块中定义任务时可以自动生成名称。

第二个参数是代理关键字参数,指定要使用的消息代理的 URL。 这里我们使用 RabbitMQ(也是默认选项)。

有关更多选择,请参阅上面的 选择代理 – 对于 RabbitMQ,您可以使用 amqp://localhost,或者对于 Redis,您可以使用 redis://localhost

您定义了一个名为 add 的任务,它返回两个数字的总和。


运行 Celery 工作服务器

您现在可以通过使用 worker 参数执行我们的程序来运行工作程序:

$ celery -A tasks worker --loglevel=INFO

笔记

如果工作程序没有启动,请参阅 故障排除 部分。


在生产中,您将希望在后台运行 worker 作为守护进程。 为此,您需要使用平台提供的工具,或类似 supervisord 的工具(有关更多信息,请参阅 Daemonization)。

有关可用命令行选项的完整列表,请执行以下操作:

$  celery worker --help

还有其他几个命令可用,帮助也可用:

$ celery --help

调用任务

要调用我们的任务,您可以使用 delay() 方法。

这是 apply_async() 方法的便捷快捷方式,可以更好地控制任务执行(参见 Calling Tasks):

>>> from tasks import add
>>> add.delay(4, 4)

该任务现在已由您之前启动的工作人员处理。 您可以通过查看工作人员的控制台输出来验证这一点。

调用任务会返回一个 @AsyncResult 实例。 这可用于检查任务的状态,等待任务完成,或获取其返回值(或如果任务失败,则获取异常和回溯)。

默认情况下不启用结果。 为了执行远程过程调用或跟踪数据库中的任务结果,您需要配置 Celery 以使用结果后端。 这将在下一节中描述。


保持结果

如果您想跟踪任务的状态,Celery 需要将状态存储或发送到某处。 有几个内置的结果后端可供选择:SQLAlchemy/Django ORM、MongoDBMemcached、Redis[ X147X]、RPC (RabbitMQ/AMQP),以及 - 或者您可以定义自己的。

在这个例子中,我们使用 rpc 结果后端,它将状态作为瞬态消息发回。 后端通过 @Celerybackend 参数指定,(如果您选择使用配置模块,则通过 :setting:`result_backend` 设置)。 因此,您可以修改 tasks.py 文件中的这一行以启用 rpc:// 后端:

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

或者,如果您想使用 Redis 作为结果后端,但仍使用 RabbitMQ 作为消息代理(一种流行的组合):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

要阅读有关结果后端的更多信息,请参阅 Result Backends

现在配置了结果后端,关闭当前的 python 会话并再次导入 tasks 模块以使更改生效。 这次您将保留调用任务时返回的 @AsyncResult 实例:

>>> from tasks import add    # close and reopen to get updated 'app'
>>> result = add.delay(4, 4)

ready() 方法返回任务是否完成处理:

>>> result.ready()
False

您可以等待结果完成,但这很少使用,因为它将异步调用转换为同步调用:

>>> result.get(timeout=1)
8

如果任务引发异常,get() 将重新引发异常,但您可以通过指定 propagate 参数来覆盖它:

>>> result.get(propagate=False)

如果任务引发异常,您还可以访问原始回溯:

>>> result.traceback

警告

后端使用资源来存储和传输结果。 为了确保资源被释放,您最终必须在调用任务后返回的每个 @AsyncResult 实例上调用 get()forget()


有关完整的结果对象参考,请参阅 celery.result


配置

Celery 就像消费电器一样,不需要太多配置即可运行。 它有一个输入和一个输出。 输入必须连接到代理,输出可以选择连接到结果后端。 但是,如果您仔细观察背面,会发现有一个盖子,里面装满了滑块、刻度盘和按钮:这就是配置。

对于大多数用例,默认配置应该足够好,但是可以配置许多选项以使 Celery 完全按照需要工作。 阅读有关可用选项的信息是熟悉可以配置的内容的好主意。 您可以阅读 配置和默认值 参考中的选项。

可以直接在应用程序上进行配置,也可以使用专用的配置模块进行配置。 例如,您可以通过更改 :setting:`task_serializer` 设置来配置用于序列化任务有效负载的默认序列化器:

app.conf.task_serializer = 'json'

如果您一次配置多个设置,您可以使用 update

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

对于较大的项目,建议使用专用的配置模块。 不鼓励硬编码周期性任务间隔和任务路由选项。 最好将它们保存在一个集中的位置。 对于图书馆来说尤其如此,因为它使用户能够控制他们的任务的行为方式。 集中配置还允许您的系统管理员在系统出现故障时进行简单的更改。

您可以通过调用 @config_from_object() 方法告诉您的 Celery 实例使用配置模块:

app.config_from_object('celeryconfig')

该模块通常称为“celeryconfig”,但您可以使用任何模块名称。

在上述情况下,名为 celeryconfig.py 的模块必须可用于从当前目录或 Python 路径加载。 它可能看起来像这样:

celeryconfig.py

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

要验证您的配置文件是否正常工作并且不包含任何语法错误,您可以尝试导入它:

$ python -m celeryconfig

有关配置选项的完整参考,请参阅 配置和默认值

为了演示配置文件的强大功能,您可以通过以下方式将行为异常的任务路由到专用队列:

celeryconfig.py

task_routes = {
    'tasks.add': 'low-priority',
}

或者,您可以对任务进行速率限制,而不是路由它,以便在一分钟内只能处理 10 个此类任务 (10/m):

celeryconfig.py

task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

如果您使用 RabbitMQ 或 Redis 作为代理,那么您还可以指示工作人员在运行时为任务设置新的速率限制:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
    new rate limit set successfully

请参阅 Routing Tasks 以了解有关任务路由的更多信息,以及 :setting:`task_annotations` 设置以了解有关注释的更多信息,或 Monitoring and Management Guide 了解更多关于远程控制命令以及如何监控您的员工正在做什么。


从这里去哪里

如果您想了解更多信息,请继续阅读 后续步骤 教程,之后您可以阅读 用户指南


故障排除

常见问题 中还有一个故障排除部分。

Worker 未启动:权限错误

  • 如果您使用的是 Debian、Ubuntu 或其他基于 Debian 的发行版:

    Debian 最近将 /dev/shm 特殊文件重命名为 /run/shm

    一个简单的解决方法是创建一个符号链接:

    # ln -s /run/shm /dev/shm
  • 其他:

    如果您提供 --pidfile--logfile--statedb 参数中的任何一个,那么您必须确保它们指向一个文件或目录,该文件或目录可由用户启动工人。


结果后端不工作或任务始终处于 PENDING 状态

默认情况下,所有任务都是 :state:`PENDING`,所以状态最好命名为“unknown”。 Celery 在发送任务时不会更新状态,任何没有历史记录的任务都被假定为挂起(毕竟你知道任务 ID)。

  1. 确保任务没有启用 ignore_result

    启用此选项将强制工作人员跳过更新状态。

  2. 确保 :setting:`task_ignore_result` 设置未启用。

  3. 确保您没有任何旧工人仍在运行。

    很容易意外启动多个工作程序,因此在启动新工作程序之前,请确保前一个工作程序已正确关闭。

    未配置预期结果后端的旧工作器可能正在运行并正在劫持任务。

    --pidfile 参数可以设置为绝对路径以确保不会发生这种情况。

  4. 确保客户端配置了正确的后端。

    如果出于某种原因,客户端配置为使用与工作线程不同的后端,您将无法收到结果。 确保后端配置正确:

    >>> result = task.delay()
    >>> print(result.backend)