Celery 后台任务 — Flask 文档

来自菜鸟教程
Flask/docs/1.1.x/patterns/celery
跳转至:导航、​搜索

Celery 后台任务

如果您的应用程序有一个长时间运行的任务,例如处理一些上传的数据或发送电子邮件,您不希望在请求期间等待它完成。 相反,使用任务队列将必要的数据发送到另一个进程,该进程将在请求立即返回时在后台运行该任务。

Celery 是一个强大的任务队列,可用于简单的后台任务以及复杂的多阶段程序和计划。 本指南将向您展示如何使用 Flask 配置 Celery,但假设您已经阅读了 Celery 文档中的 使用 Celery 的第一步 指南。

安装

Celery 是一个单独的 Python 包。 使用 pip 从 PyPI 安装它:

$ pip install celery

配置

您需要的第一件事是 Celery 实例,这称为 celery 应用程序。 它的作用与 Flask 中的 Flask 对象相同,仅适用于 Celery。 由于这个实例被用作你想在 Celery 中做的所有事情的入口点,比如创建任务和管理工作人员,其他模块必须可以导入它。

例如,您可以将其放置在 tasks 模块中。 虽然您可以使用 Celery 而无需对 Flask 进行任何重新配置,但通过子类化任务并添加对 Flask 应用程序上下文的支持并将其与 Flask 配置挂钩,它会变得更好一些。

这是将 Celery 与 Flask 正确集成所必需的全部内容:

from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

该函数创建一个新的 Celery 对象,使用应用程序配置中的代理对其进行配置,从 Flask 配置中更新 Celery 配置的其余部分,然后创建任务的子类,将任务执行包装在应用程序上下文中。


示例任务

让我们编写一个任务,将两个数字相加并返回结果。 我们将 Celery 的 broker 和后端配置为使用 Redis,使用上面的因子创建一个 celery 应用程序,然后使用它来定义任务。

from flask import Flask

flask_app = Flask(__name__)
flask_app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)

@celery.task()
def add_together(a, b):
    return a + b

现在可以在后台调用此任务:

result = add_together.delay(23, 42)
result.wait()  # 65

运行一个工人

如果你跳进去并已经执行了上面的代码,你会失望地发现 .wait() 永远不会真正返回。 那是因为您还需要运行 Celery worker 来接收和执行任务。

$ celery -A your_application.celery worker

your_application 字符串必须指向创建 celery 对象的应用程序包或模块。

现在工作程序正在运行,一旦任务完成,wait 将返回结果。