常见问题 — Python 文档

来自菜鸟教程
Celery/docs/latest/faq
跳转至:导航、​搜索

常见问题


一般

我应该用芹菜做哪些事情?

回答: 排队一切,让每个人都高兴 is a good article describing why you’d use a queue in a web context.

这些是一些常见的用例:

  • 在后台运行一些东西。 例如,为了尽快完成 Web 请求,然后逐步更新用户页面。 这给用户留下了良好的性能和“快速”的印象,即使实际工作实际上可能需要一些时间。
  • 在 Web 请求完成后运行一些东西。
  • 通过异步执行和使用重试来确保某事完成。
  • 安排定期工作。

在某种程度上:

  • 分布式计算。
  • 并行执行。


误解

Celery 真的有 50.000 行代码吗?

Answer: 不,这和类似的大量数字已在不同地点报告。

在撰写本文时,数字是:

  • 核心:7,141 行代码。
  • 测试:14,209 行。
  • 后端、contrib、compat 实用程序:9,032 行。


代码行数不是一个有用的指标,因此即使 Celery 确实包含 50k 行代码,您也无法从这样的数字中得出任何结论。


Celery 有很多依赖吗?

一个普遍的批评是 Celery 使用了太多的依赖项。 这种恐惧背后的基本原理很难想象,尤其是考虑到代码重用是现代软件开发中解决复杂性的既定方法,而且现在添加依赖项的成本非常低,因为 pip 和 PyPI 等包管理器使安装变得麻烦并保持依赖关系已成为过去。

Celery 一路上替换了几个依赖,目前的依赖列表是:

芹菜

Kombu 是 Celery 生态系统的一部分,是用于发送和接收消息的库。 它也是使我们能够支持许多不同消息代理的库。 它也被 OpenStack 项目和许多其他项目使用,以验证将其与 Celery 代码库分开的选择。

台球是 Python 多处理模块的一个分支,包含许多性能和稳定性改进。 最终目标是有朝一日将这些改进合并回 Python。

它还用于与多处理模块不附带的旧 Python 版本兼容。

pytz 模块提供时区定义和相关工具。


昆布

Kombu 依赖于以下软件包:

底层纯 Python amqp 客户端实现。 AMQP 作为默认代理,这是一个自然的依赖。

笔记

为了处理流行配置选项的依赖关系,Celery 定义了许多“捆绑”包,请参阅 Bundles


芹菜很重吗?

Celery 在内存占用和性能方面的开销都非常小。

但请注意,默认配置并未针对时间和空间进行优化,请参阅 Optimizing 指南了解更多信息。


芹菜依赖泡菜吗?

Answer: 不,Celery 可以支持任何序列化方案。

我们内置了对 JSON、YAML、Pickle 和 msgpack 的支持。 每个任务都与一种内容类型相关联,因此您甚至可以使用 pickle 发送一个任务,使用 JSON 发送另一个任务。

默认的序列化支持曾经是 pickle,但从 4.0 开始,现在默认是 JSON。 如果您需要将复杂的 Python 对象作为任务参数发送,您可以使用 pickle 作为序列化格式,但请参阅 Serializers 中的注释。

如果您需要与其他语言进行通信,您应该使用适合该任务的序列化格式,这几乎意味着任何非 pickle 的序列化程序。

您可以设置全局默认序列化程序、特定任务的默认序列化程序,甚至在发送单个任务实例时使用的序列化程序。


芹菜只适用于 Django 吗?

Answer: 不,您可以将 Celery 与任何框架、Web 或其他框架一起使用。


我必须使用 AMQP/RabbitMQ 吗?

:不可以,虽然推荐使用RabbitMQ,但也可以使用Redis、SQS或Qpid。

有关更多信息,请参阅 后端和代理

Redis 作为代理的性能不如 AMQP 代理,但通常使用 RabbitMQ 作为代理和 Redis 作为结果存储的组合。 如果您有严格的可靠性要求,我们鼓励您使用 RabbitMQ 或其他 AMQP 代理。 一些传输还使用轮询,因此它们可能会消耗更多资源。 但是,如果您由于某种原因无法使用 AMQP,请随意使用这些替代方案。 它们可能适用于大多数用例,并注意以上几点并非特定于 Celery; 如果以前使用 Redis/数据库作为队列对您来说效果很好,那么现在可能会这样。 如果需要,您可以随时升级。


Celery 是多语言的吗?

答案: 是的。

worker 是 Python 中 Celery 的一个实现。 如果该语言具有 AMQP 客户端,则用您的语言创建工作程序应该没有太多工作。 Celery worker 只是一个连接到代理来处理消息的程序。

此外,还有另一种独立于语言的方法,那就是使用 REST 任务,而不是您的任务是函数,它们是 URL。 有了这些信息,您甚至可以创建支持代码预加载的简单 Web 服务器。 只需公开一个执行操作的端点,然后创建一个只执行对该端点的 HTTP 请求的任务。

您还可以使用 Flower 的 REST API 来调用任务。


故障排除

MySQL 抛出死锁错误,我该怎么办?

Answer: MySQL 默认隔离级别设置为 REPEATABLE-READ,如果你真的不需要,设置为 READ-COMMITTED。 您可以通过将以下内容添加到 my.cnf 来实现:

[mysqld]
transaction-isolation = READ-COMMITTED

有关 InnoDB 事务模型的更多信息,请参阅 MySQL 用户手册中的 MySQL - InnoDB 事务模型和锁定

(感谢 Honza Kral 和 Anton Tsigularov 提供此解决方案)


任务结果无法可靠返回

Answer: 如果您使用数据库后端获取结果,尤其是使用 MySQL,请参阅 MySQL 抛出死锁错误,我该怎么办?


为什么 Task.delay/apply*/the worker 只是挂了?

Answer: 某些 AMQP 客户端存在一个错误,如果它无法对当前用户进行身份验证、密码不匹配或用户无权访问指定的虚拟主机,则会使其挂起. 请务必检查您的代理日志(对于大多数系统上的 /var/log/rabbitmq/rabbit.log RabbitMQ),它通常包含描述原因的消息。


它适用于 FreeBSD 吗?

答案: 视情况而定;

当使用 RabbitMQ (AMQP) 和 Redis 传输时,它应该是开箱即用的。

对于其他传输,使用兼容性 prefork 池并需要一个有效的 POSIX 信号量实现,自 FreeBSD 8.x 以来,默认情况下在 FreeBSD 中启用。 对于旧版本的 FreeBSD,您必须在内核中启用 POSIX 信号量并手动重新编译台球。

幸运的是,Viktor Petersson 写了一个教程来帮助您在 FreeBSD 上开始使用 Celery:http://www.playingwithwire.com/2009/10/how-to-get-celeryd-to-work-on-freebsd /


我有 IntegrityError: Duplicate Key 错误。 为什么?

答:MySQL抛出死锁错误,我该怎么办?。 感谢 :github_user:`@howsthedotcom`


为什么我的任务没有被处理?

Answer: 使用RabbitMQ,您可以通过运行以下命令查看当前有多少消费者正在接收任务:

$ rabbitmqctl list_queues -p <myvhost> name messages consumers
Listing queues ...
celery     2891    2

这表明任务队列中有 2891 条消息等待处理,并且有两个消费者正在处理它们。

队列从未清空的一个原因可能是您有一个陈旧的工作进程将消息作为人质。 如果工作人员没有正确关闭,则可能会发生这种情况。

当工作人员接收到消息时,代理会等待消息被确认,然后再将消息标记为已处理。 在消费者正确关闭之前,代理不会将该消息重新发送给另一个消费者。

如果遇到此问题,则必须手动杀死所有工作人员并重新启动它们:

$ pkill 'celery worker'

$ # - If you don't have pkill use:
$ # ps auxww | awk '/celery worker/ {print $2}' | xargs kill

您可能需要等待一段时间,直到所有工作人员都完成执行任务。 如果它在很长一段时间后仍然挂起,您可以使用以下方法强行杀死它们:

$ pkill -9 'celery worker'

$ # - If you don't have pkill use:
$ # ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9

为什么我的任务不会运行?

Answer: 可能存在阻止导入任务模块的语法错误。

您可以通过手动执行任务来了解 Celery 是否能够运行该任务:

>>> from myapp.tasks import MyPeriodicTask
>>> MyPeriodicTask.delay()

查看工作日志文件以查看它是否能够找到任务,或者是否发生了其他错误。


为什么我的周期性任务不会运行?

答案:参见为什么我的任务不能运行?


如何清除所有等待的任务?

Answer: 您可以使用 celery purge 命令清除所有配置的任务队列:

$ celery -A proj purge

或以编程方式:

>>> from proj.celery import app
>>> app.control.purge()
1753

如果您只想清除特定队列中的消息,您必须使用 AMQP API 或 celery amqp 实用程序:

$ celery -A proj amqp queue.purge <queue name>

数字 1753 是删除的消息数。

您还可以在启用 --purge 选项的情况下启动工作器,以在工作器启动时清除消息。


我已清除消息,但队列中仍有消息?

Answer: 任务一实际执行就被确认(从队列中移除)。 Worker 接收到一个任务后,需要一些时间才能真正执行,特别是当有很多任务已经在等待执行时。 未确认的消息由工作人员保留,直到它关闭与代理(AMQP 服务器)的连接。 当该连接关闭时(例如,因为 worker 被停止),任务将由代理重新发送到下一个可用的 worker(或重新启动时的同一个 worker),以便正确清除等待任务的队列您必须停止所有工人,然后使用 celery.control.purge() 清除任务。


结果

如果我有指向那里的 ID,我如何获得任务的结果?

答案:使用task.AsyncResult:

>>> result = my_task.AsyncResult(task_id)
>>> result.get()

这将为您提供一个 AsyncResult 使用任务当前结果后端的实例。

如果您需要指定自定义结果后端,或者您想使用当前应用程序的默认后端,您可以使用 @AsyncResult

>>> result = app.AsyncResult(task_id)
>>> result.get()

安全

使用 pickle 不是一个安全问题吗?

Answer:确实,从 Celery 4.0 开始,默认的序列化器现在是 JSON,以确保人们有意识地选择序列化器并意识到这一点。

您必须防止未经授权访问您的代理、数据库和其他传输腌制数据的服务。

请注意,这不仅仅是您在使用 Celery 时应该注意的事情,例如 Django 也将 pickle 用于其缓存客户端。

对于任务消息,您可以将 :setting:`task_serializer` 设置设置为“json”或“yaml”而不是 pickle。

同样,对于任务结果,您可以设置 :setting:`result_serializer`

有关使用的格式和检查任务使用的格式时的查找顺序的更多详细信息,请参阅 Serializers


消息可以加密吗?

Answer:一些AMQP代理支持使用SSL(包括RabbitMQ)。 您可以使用 :setting:`broker_use_ssl` 设置启用此功能。

还可以为消息添加额外的加密和安全性,如果您有此需要,那么您应该联系 邮件列表


以 root 身份运行 celery worker 是否安全?

回答:不!

我们目前还没有意识到任何安全问题,但是假设它们不存在是非常天真的,所以运行 Celery 服务(celery workercelery beatceleryev 等)建议作为非特权用户。


经纪人

为什么 RabbitMQ 会崩溃?

答: RabbitMQ 内存不足会崩溃。 这将在 RabbitMQ 的未来版本中修复。 请参考RabbitMQ FAQ:https://www.rabbitmq.com/faq.html#node-runs-out-of-memory

笔记

情况不再如此,RabbitMQ 2.0 及更高版本包含一个新的持久器,它可以容忍内存不足错误。 Celery 推荐使用 RabbitMQ 2.1 或更高版本。

如果您仍在运行旧版本的RabbitMQ并遇到崩溃,那么请升级!


Celery 的错误配置最终会导致旧版本的 RabbitMQ 崩溃。 即使它没有崩溃,这仍然会消耗大量资源,因此了解常见的陷阱很重要。

  • 事件。

使用 -E 选项运行 worker 将为工作器内部发生的事件发送消息。

仅当您有一个使用它们的活动监视器,或者您定期清除事件队列时,才应启用事件。

  • AMQP 后端结果。

使用 AMQP 结果后端运行时,每个任务结果都将作为消息发送。 如果你不收集这些结果,它们就会堆积起来,RabbitMQ 最终会耗尽内存。

此结果后端现已弃用,因此您不应使用它。 如果您需要多消费者访问结果,请使用 RPC 后端进行 rpc 样式调用,或使用持久后端。

默认情况下,结果会在 1 天后过期。 通过配置 :setting:`result_expires` 设置来降低这个值可能是个好主意。

如果您不将结果用于任务,请确保设置 ignore_result 选项:

@app.task(ignore_result=True)
def mytask():
    pass

class MyTask(Task):
    ignore_result = True

我可以将 Celery 与 ActiveMQ/STOMP 一起使用吗?

答案:没有。 它曾经被 :pypi:`Carrot`(我们的旧消息库)支持,但目前在 :pypi:`Kombu`(我们的新消息库)中不受支持。


不使用 AMQP 代理时不支持哪些功能?

这是使用虚拟传输时不可用的功能的不完整列表:

  • 远程控制命令(仅 Redis 支持)。
  • 事件监控可能无法在所有虚拟传输中工作。
  • *; header 和 fanout 交换类型
    (Redis 支持fanout)。


任务

调用任务时如何重用相同的连接?

答案:参见:setting:`broker_pool_limit` 设置。 从 2.5 版开始,默认情况下启用连接池。


subprocess 中的 sudo 返回 None

有一个 sudo 配置选项,这使得没有 tty 的进程运行 sudo 是非法的:

Defaults requiretty

如果您的 /etc/sudoers 文件中有此配置,那么当工作程序作为守护程序运行时,任务将无法调用 sudo。 如果要启用它,则需要从 /etc/sudoers 中删除该行。

参见:http://timelordz.com/wiki/Apache_Sudo_Commands


如果工作人员无法处理它们,为什么要从队列中删除它们?

答案

工作人员拒绝未知任务、带有编码错误的消息和不包含正确字段的消息(根据任务消息协议)。

如果它没有拒绝它们,它们可能会一次又一次地重新交付,从而导致循环。

RabbitMQ 的最新版本能够为交换配置死信队列,以便将被拒绝的消息移动到那里。


我可以按名称调用任务吗?

答案:是的,使用@send_task()

您还可以使用 AMQP 客户端按名称调用任何语言的任务:

>>> app.send_task('tasks.add', args=[2, 2], kwargs={})
<AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>

要将 chainchordgroup 与按名称调用的任务一起使用,请使用 @Celery.signature() 方法:

>>> chain(
...     app.signature('tasks.add', args=[2, 2], kwargs={}),
...     app.signature('tasks.add', args=[1, 1], kwargs={})
... ).apply_async()
<AsyncResult: e9d52312-c161-46f0-9013-2713e6df812d>

我可以获取当前任务的任务 id 吗?

Answer:是的,当前id和更多在任务请求中可用:

@app.task(bind=True)
def mytask(self):
    cache.set(self.request.id, "Running")

有关更多信息,请参阅 任务请求

如果您没有对任务实例的引用,您可以使用 app.current_task

>>> app.current_task.request.id

但请注意,这将是任何任务,无论是由工作人员执行的任务,还是由该任务直接调用的任务,或者是热切调用的任务。

要获得正在处理的当前任务,请使用 app.current_worker_task

>>> app.current_worker_task.request.id

笔记

@current_task@current_worker_task 都可以是 None


我可以指定自定义 task_id 吗?

Answer:是的,使用 task_id 参数到 Task.apply_async()

>>> task.apply_async(args, kwargs, task_id='…')

我可以在任务中使用装饰器吗?

答案:是的,但请参阅侧栏中的注释基础知识


我可以使用自然任务 ID 吗?

Answer:是的,但要确保它是唯一的,因为两个具有相同 id 的任务的行为是未定义的。

世界可能不会爆炸,但他们绝对可以覆盖彼此的结果。


一旦另一个任务完成,我可以运行一个任务吗?

Answer:是的,你可以安全地在一个任务中启动一个任务。

一种常见的模式是向任务添加回调:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    return x + y

@app.task(ignore_result=True)
def log_result(result):
    logger.info("log_result got: %r", result)

调用:

>>> (add.s(2, 2) | log_result.s()).delay()

有关更多信息,请参阅 Canvas:设计工作流


我可以取消任务的执行吗?

答案:是的,使用result.revoke()

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

或者如果您只有任务 ID:

>>> from proj.celery import app
>>> app.control.revoke(task_id)

后者还支持将任务 ID 列表作为参数传递。


为什么不是所有工人都收到我的远程控制命令?

Answer:为了接收广播远程控制命令,每个worker节点根据worker的nodename创建一个唯一的队列名称。

如果您有多个具有相同主机名的工人,则控制命令将在它们之间以循环方式接收。

要解决此问题,您可以使用 -n 参数为 worker 显式设置每个工人的节点名称:

$ celery -A proj worker -n worker1@%h
$ celery -A proj worker -n worker2@%h

其中 %h 扩展为当前主机名。


我可以只向某些服务器发送一些任务吗?

Answer: 是的,您可以将任务路由到一个或多个worker,使用不同的消息路由拓扑,并且一个worker 实例可以绑定到多个队列。

有关更多信息,请参阅 路由任务


我可以禁用任务的预取吗?

回答:也许吧! AMQP 术语“预取”令人困惑,因为它仅用于描述任务预取 limit。 不涉及实际的预取。

禁用预取限制是可能的,但这意味着工作人员将尽可能快地消耗尽可能多的任务。

关于预取限制和一次只保留一个任务的工作器的配置设置的讨论可以在这里找到:Prefetch Limits


我可以在运行时更改周期性任务的间隔吗?

:可以,可以使用Django数据库调度器,也可以新建一个调度子类并覆盖is_due()

from celery.schedules import schedule

class my_schedule(schedule):

    def is_due(self, last_run_at):
        return run_now, next_time_to_check

Celery 是否支持任务优先级?

:是的,RabbitMQ从3.5.0版本开始支持优先级,Redis传输模拟优先级支持。

您还可以通过将高优先级任务路由到不同的工作人员来确定工作的优先级。 在现实世界中,这通常比按消息优先级更有效。 您可以将此与速率限制和每条消息优先级结合使用,以实现响应式系统。


我应该使用重试还是 acks_late?

答案:视情况而定。 它不一定是一种或另一种,您可能想同时使用两者。

Task.retry 用于重试任务,特别是用于 try 块可捕获的预期错误。 AMQP 事务不用于这些错误:如果任务引发异常,它仍然被确认!

acks_late 设置将在您需要再次执行任务时使用,如果工作程序(由于某种原因)在执行过程中崩溃。 重要的是要注意,worker 不会崩溃,如果它崩溃了,通常是一个需要人工干预的不可恢复的错误(worker 中的错误,或任务代码)。

在理想情况下,您可以安全地重试任何失败的任务,但这种情况很少见。 想象一下以下任务:

@app.task
def process_upload(filename, tmpfile):
    # Increment a file count stored in a database
    increment_file_counter()
    add_file_metadata_to_db(filename, tmpfile)
    copy_file_to_destination(filename, tmpfile)

如果它在将文件复制到目的地的过程中崩溃,世界将包含不完整的状态。 这当然不是一个关键的场景,但你可能会想象出更险恶的事情。 所以为了便于编程,我们的可靠性较低; 这是一个很好的默认设置,需要它并知道他们在做什么的用户仍然可以启用 acks_late(并且在未来希望使用手动确认)。

此外 Task.retry 具有 AMQP 事务中不可用的功能:重试之间的延迟、最大重试等。

因此,对 Python 错误使用重试,如果您的任务是幂等的,如果需要该级别的可靠性,请将其与 acks_late 结合使用。


我可以安排任务在特定时间执行吗?

回答:是的。 您可以使用 Task.apply_async() 的 eta[X23X] 参数。

另见 定期任务


我可以安全地关闭工人吗?

答案:是的,使用:sig:`TERM` 信号。

这将告诉工作人员完成所有当前正在执行的作业并尽快关闭。 只要关闭完成,即使使用实验性传输也不应该丢失任何任务。

你永远不应该用 :sig:`KILL` 信号 (kill -9) 停止 worker,除非你已经尝试过 :sig:`TERM` ] 几次,然后等了几分钟,让它有机会关闭。

还要确保你只杀死主工作进程,而不是它的任何子进程。 如果您知道进程当前正在执行工作程序关闭所依赖的任务,则可以将终止信号定向到特定的子进程,但这也意味着将为任务设置 WorkerLostError 状态,以便任务不会再跑了。

如果您安装了 :pypi:`setproctitle` 模块,则识别进程类型会更容易:

$ pip install setproctitle

安装此库后,您将能够在 ps 列表中看到进程类型,但必须重新启动工作程序才能使其生效。

也可以看看

停止工人


我可以在 [platform] 的后台运行 worker 吗?

答案:是的,请参见守护进程


姜戈

django-celery-beat创建的数据库表有什么用途?

当使用数据库支持的调度时,周期性任务调度取自 PeriodicTask 模型,还有其他几个辅助表(IntervalScheduleCrontabSchedule、[ X174X])。


django-celery-results创建的数据库表有什么用途?

Django 数据库结果后端扩展需要两个额外的模型:TaskResultGroupResult


窗户

Celery 支持 Windows 吗?

答案:没有。

从 Celery 4.x 开始,由于资源不足,不再支持 Windows。

但它可能仍然有效,我们很乐意接受补丁。