Celery 的第一步 — Python 文档
芹菜的第一步
Celery 是一个包含电池的任务队列。 它易于使用,因此您无需了解它所解决问题的全部复杂性即可开始使用。 它围绕最佳实践而设计,以便您的产品可以扩展并与其他语言集成,并且附带了在生产中运行此类系统所需的工具和支持。
在本教程中,您将学习使用 Celery 的绝对基础知识。
了解;
- 选择并安装消息传输(代理)。
- 安装 Celery 并创建您的第一个任务。
- 启动工作程序并调用任务。
- 在任务过渡到不同状态时跟踪任务,并检查返回值。
Celery 起初可能看起来令人生畏 - 但别担心 - 本教程将让您立即开始。 它故意保持简单,以免让您与高级功能混淆。 完成本教程后,最好浏览一下文档的其余部分。 例如,下一步 教程将展示 Celery 的功能。
选择经纪人
Celery 需要一个解决方案来发送和接收消息; 通常这以称为 消息代理 的单独服务的形式出现。
有多种选择,包括:
兔MQ
RabbitMQ 功能齐全、稳定、耐用且易于安装。 它是生产环境的绝佳选择。 有关在 Celery 中使用 RabbitMQ 的详细信息:
如果您使用的是 Ubuntu 或 Debian,请执行以下命令安装 RabbitMQ:
$ sudo apt-get install rabbitmq-server
或者,如果您想在 Docker 上运行它,请执行以下操作:
$ docker run -d -p 5672:5672 rabbitmq
命令完成后,代理将在后台运行,准备为您移动消息:Starting rabbitmq-server: SUCCESS
。
如果您运行的不是 Ubuntu 或 Debian,请不要担心,您可以访问此网站以查找其他平台(包括 Microsoft Windows)的类似简单安装说明:
Redis
Redis 也功能齐全,但在突然终止或断电的情况下更容易丢失数据。 关于使用Redis的详细信息:
如果要在 Docker 上运行它,请执行以下操作:
$ docker run -d -p 6379:6379 redis
安装芹菜
Celery 位于 Python Package Index (PyPI) 上,因此可以使用标准 Python 工具(如 pip
或 easy_install
)安装它:
$ pip install celery
应用
你需要的第一件事是一个 Celery 实例。 我们将其称为 Celery 应用程序 或简称为 app。 因为这个实例被用作你想在 Celery 中做的所有事情的入口点,比如创建任务和管理工作人员,其他模块必须可以导入它。
在本教程中,我们将所有内容都包含在单个模块中,但对于较大的项目,您希望创建一个 专用模块 。
让我们创建文件 tasks.py
:
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
Celery
的第一个参数是当前模块的名称。 这只是为了在 __main__ 模块中定义任务时可以自动生成名称。
第二个参数是代理关键字参数,指定要使用的消息代理的 URL。 这里我们使用 RabbitMQ(也是默认选项)。
有关更多选择,请参阅上面的 选择代理 – 对于 RabbitMQ,您可以使用 amqp://localhost
,或者对于 Redis,您可以使用 redis://localhost
。
您定义了一个名为 add
的任务,它返回两个数字的总和。
运行 Celery 工作服务器
您现在可以通过使用 worker
参数执行我们的程序来运行工作程序:
$ celery -A tasks worker --loglevel=INFO
在生产中,您将希望在后台运行 worker 作为守护进程。 为此,您需要使用平台提供的工具,或类似 supervisord 的工具(有关更多信息,请参阅 Daemonization)。
有关可用命令行选项的完整列表,请执行以下操作:
$ celery worker --help
还有其他几个命令可用,帮助也可用:
$ celery --help
调用任务
要调用我们的任务,您可以使用 delay()
方法。
这是 apply_async()
方法的便捷快捷方式,可以更好地控制任务执行(参见 Calling Tasks):
>>> from tasks import add
>>> add.delay(4, 4)
该任务现在已由您之前启动的工作人员处理。 您可以通过查看工作人员的控制台输出来验证这一点。
调用任务会返回一个 @AsyncResult
实例。 这可用于检查任务的状态,等待任务完成,或获取其返回值(或如果任务失败,则获取异常和回溯)。
默认情况下不启用结果。 为了执行远程过程调用或跟踪数据库中的任务结果,您需要配置 Celery 以使用结果后端。 这将在下一节中描述。
保持结果
如果您想跟踪任务的状态,Celery 需要将状态存储或发送到某处。 有几个内置的结果后端可供选择:SQLAlchemy/Django ORM、MongoDB、Memcached、Redis[ X147X]、RPC (RabbitMQ/AMQP),以及 - 或者您可以定义自己的。
在这个例子中,我们使用 rpc 结果后端,它将状态作为瞬态消息发回。 后端通过 @Celery
的 backend
参数指定,(如果您选择使用配置模块,则通过 :setting:`result_backend` 设置)。 因此,您可以修改 tasks.py 文件中的这一行以启用 rpc:// 后端:
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
或者,如果您想使用 Redis 作为结果后端,但仍使用 RabbitMQ 作为消息代理(一种流行的组合):
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
要阅读有关结果后端的更多信息,请参阅 Result Backends。
现在配置了结果后端,关闭当前的 python 会话并再次导入 tasks
模块以使更改生效。 这次您将保留调用任务时返回的 @AsyncResult
实例:
>>> from tasks import add # close and reopen to get updated 'app'
>>> result = add.delay(4, 4)
ready()
方法返回任务是否完成处理:
>>> result.ready()
False
您可以等待结果完成,但这很少使用,因为它将异步调用转换为同步调用:
>>> result.get(timeout=1)
8
如果任务引发异常,get()
将重新引发异常,但您可以通过指定 propagate
参数来覆盖它:
>>> result.get(propagate=False)
如果任务引发异常,您还可以访问原始回溯:
>>> result.traceback
警告
后端使用资源来存储和传输结果。 为了确保资源被释放,您最终必须在调用任务后返回的每个 @AsyncResult
实例上调用 get()
或 forget()
。
有关完整的结果对象参考,请参阅 celery.result
。
配置
Celery 就像消费电器一样,不需要太多配置即可运行。 它有一个输入和一个输出。 输入必须连接到代理,输出可以选择连接到结果后端。 但是,如果您仔细观察背面,会发现有一个盖子,里面装满了滑块、刻度盘和按钮:这就是配置。
对于大多数用例,默认配置应该足够好,但是可以配置许多选项以使 Celery 完全按照需要工作。 阅读有关可用选项的信息是熟悉可以配置的内容的好主意。 您可以阅读 配置和默认值 参考中的选项。
可以直接在应用程序上进行配置,也可以使用专用的配置模块进行配置。 例如,您可以通过更改 :setting:`task_serializer` 设置来配置用于序列化任务有效负载的默认序列化器:
app.conf.task_serializer = 'json'
如果您一次配置多个设置,您可以使用 update
:
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
对于较大的项目,建议使用专用的配置模块。 不鼓励硬编码周期性任务间隔和任务路由选项。 最好将它们保存在一个集中的位置。 对于图书馆来说尤其如此,因为它使用户能够控制他们的任务的行为方式。 集中配置还允许您的系统管理员在系统出现故障时进行简单的更改。
您可以通过调用 @config_from_object()
方法告诉您的 Celery 实例使用配置模块:
app.config_from_object('celeryconfig')
该模块通常称为“celeryconfig
”,但您可以使用任何模块名称。
在上述情况下,名为 celeryconfig.py
的模块必须可用于从当前目录或 Python 路径加载。 它可能看起来像这样:
celeryconfig.py
:
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
要验证您的配置文件是否正常工作并且不包含任何语法错误,您可以尝试导入它:
$ python -m celeryconfig
有关配置选项的完整参考,请参阅 配置和默认值 。
为了演示配置文件的强大功能,您可以通过以下方式将行为异常的任务路由到专用队列:
celeryconfig.py
:
task_routes = {
'tasks.add': 'low-priority',
}
或者,您可以对任务进行速率限制,而不是路由它,以便在一分钟内只能处理 10 个此类任务 (10/m):
celeryconfig.py
:
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
如果您使用 RabbitMQ 或 Redis 作为代理,那么您还可以指示工作人员在运行时为任务设置新的速率限制:
$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully
请参阅 Routing Tasks 以了解有关任务路由的更多信息,以及 :setting:`task_annotations` 设置以了解有关注释的更多信息,或 Monitoring and Management Guide 了解更多关于远程控制命令以及如何监控您的员工正在做什么。
故障排除
常见问题 中还有一个故障排除部分。
Worker 未启动:权限错误
如果您使用的是 Debian、Ubuntu 或其他基于 Debian 的发行版:
Debian 最近将
/dev/shm
特殊文件重命名为/run/shm
。一个简单的解决方法是创建一个符号链接:
# ln -s /run/shm /dev/shm
其他:
如果您提供
--pidfile
、--logfile
或--statedb
参数中的任何一个,那么您必须确保它们指向一个文件或目录,该文件或目录可由用户启动工人。
结果后端不工作或任务始终处于 PENDING 状态
默认情况下,所有任务都是 :state:`PENDING`,所以状态最好命名为“unknown”。 Celery 在发送任务时不会更新状态,任何没有历史记录的任务都被假定为挂起(毕竟你知道任务 ID)。
确保任务没有启用
ignore_result
。启用此选项将强制工作人员跳过更新状态。
确保 :setting:`task_ignore_result` 设置未启用。
确保您没有任何旧工人仍在运行。
很容易意外启动多个工作程序,因此在启动新工作程序之前,请确保前一个工作程序已正确关闭。
未配置预期结果后端的旧工作器可能正在运行并正在劫持任务。
--pidfile
参数可以设置为绝对路径以确保不会发生这种情况。确保客户端配置了正确的后端。
如果出于某种原因,客户端配置为使用与工作线程不同的后端,您将无法收到结果。 确保后端配置正确:
>>> result = task.delay() >>> print(result.backend)