使用 Celery 进行测试 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/testing
跳转至:导航、​搜索

用芹菜测试

任务和单元测试

要在单元测试中测试任务行为,首选方法是模拟。

热切模式

:setting:`task_always_eager` 设置启用的热切模式根据定义不适合单元测试。

使用 Eager 模式进行测试时,您只是在测试工人中发生的情况的模拟,并且模拟与现实中发生的情况之间存在许多差异。

请注意,默认情况下,急切执行的任务不会将结果写入后端。 如果要启用此功能,请查看 :setting:`task_store_eager_result`


Celery 任务很像 Web 视图,因为它应该只定义如何在被称为任务的上下文中执行操作。

这意味着任务最好只处理序列化、消息头、重试等,实际逻辑在别处实现。

假设我们有一个这样的任务:

from .models import Product


@app.task(bind=True)
def send_order(self, product_pk, quantity, price):
    price = Decimal(price)  # json serializes this to string.

    # models are passed by id, not serialized.
    product = Product.objects.get(product_pk)

    try:
        product.order(quantity, price)
    except OperationalError as exc:
        raise self.retry(exc=exc)

Note:任务被绑定意味着任务的第一个参数将始终是任务实例(自身)。 这意味着您确实获得了一个 self 参数作为第一个参数,并且可以使用 Task 类的方法和属性。

您可以为此任务编写单元测试,使用模拟如下示例:

from pytest import raises

from celery.exceptions import Retry

# for python 2: use mock.patch from `pip install mock`.
from unittest.mock import patch

from proj.models import Product
from proj.tasks import send_order

class test_send_order:

    @patch('proj.tasks.Product.order')  # < patching Product in module above
    def test_success(self, product_order):
        product = Product.objects.create(
            name='Foo',
        )
        send_order(product.pk, 3, Decimal(30.3))
        product_order.assert_called_with(3, Decimal(30.3))

    @patch('proj.tasks.Product.order')
    @patch('proj.tasks.send_order.retry')
    def test_failure(self, send_order_retry, product_order):
        product = Product.objects.create(
            name='Foo',
        )

        # Set a side effect on the patched methods
        # so that they raise the errors we want.
        send_order_retry.side_effect = Retry()
        product_order.side_effect = OperationalError()

        with raises(Retry):
            send_order(product.pk, 3, Decimal(30.6))

pytest

4.0 版中的新功能。


Celery 还提供了一个 :pypi:`pytest` 插件,可以添加可以在集成(或单元)测试套件中使用的装置。

启用

Celery 最初以禁用状态发布插件,要启用它,您可以:

  • pip install celery[pytest]
  • pip install pytest-celery
  • 或添加环境变量 PYTEST_PLUGINS=celery.contrib.pytest
  • 或将 pytest_plugins = ("celery.contrib.pytest", ) 添加到您的根 conftest.py


标记

celery - 设置测试应用配置。

celery 标记使您能够覆盖用于单个测试用例的配置:

@pytest.mark.celery(result_backend='redis://')
def test_something():
    ...

或者对于一个类中的所有测试用例:

@pytest.mark.celery(result_backend='redis://')
class test_something:

    def test_one(self):
        ...

    def test_two(self):
        ...

灯具

功能范围

celery_app - 用于测试的 Celery 应用程序。

该夹具返回一个可用于测试的 Celery 应用程序。

示例:

def test_create_task(celery_app, celery_worker):
    @celery_app.task
    def mul(x, y):
        return x * y

    assert mul.delay(4, 4).get(timeout=10) == 16
celery_worker - 嵌入实时工作者。

这个夹具启动了一个 Celery 工作实例,您可以将其用于集成测试。 worker 将在 单独的线程 中启动,并在测试返回后立即关闭。

默认情况下,fixture 将等待最多 10 秒让 worker 完成未完成的任务,如果超过时间限制,则会引发异常。 可以通过设置 celery_worker_parameters() 夹具返回的字典中的 shutdown_timeout 键来自定义超时。

示例:

# Put this in your conftest.py
@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'redis://'
    }

def test_add(celery_worker):
    mytask.delay()


# If you wish to override some setting in one test cases
# only - you can use the ``celery`` mark:
@pytest.mark.celery(result_backend='rpc')
def test_other(celery_worker):
    ...

默认情况下,心跳是禁用的,这意味着测试工作人员不会为 worker-onlineworker-offlineworker-heartbeat 发送事件。 要启用心跳,请修改 celery_worker_parameters() 夹具:

# Put this in your conftest.py
@pytest.fixture(scope="session")
def celery_worker_parameters():
    return {"without_heartbeat": False}
    ...

会话范围

celery_config - 覆盖设置 Celery 测试应用程序配置。

您可以重新定义这个装置来配置测试 Celery 应用程序。

然后您的灯具返回的配置将用于配置 celery_app()celery_session_app() 灯具。

示例:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'rpc',
    }
celery_parameters - 覆盖设置 Celery 测试应用程序参数。

你可以重新定义这个夹具来改变测试 Celery 应用程序的 __init__ 参数。 与 celery_config() 相比,这些在实例化 Celery 时直接传递给。

然后您的灯具返回的配置将用于配置 celery_app()celery_session_app() 灯具。

示例:

@pytest.fixture(scope='session')
def celery_parameters():
    return {
        'task_cls':  my.package.MyCustomTaskClass,
        'strict_typing': False,
    }
celery_worker_parameters - 覆盖设置 Celery 工作器参数。

你可以重新定义这个夹具来改变测试 Celery 工人的 __init__ 参数。 这些在实例化时直接传递给 WorkController

然后您的灯具返回的配置将用于配置 celery_worker()celery_session_worker() 灯具。

示例:

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {
        'queues':  ('high-prio', 'low-prio'),
        'exclude_queues': ('celery'),
    }
celery_enable_logging - 覆盖以启用嵌入式工作人员的日志记录。

这是一个您可以覆盖以启用嵌入式工作人员日志记录的装置。

示例:

@pytest.fixture(scope='session')
def celery_enable_logging():
    return True
celery_includes - 为嵌入式工作者添加额外的导入。

您可以在嵌入式工作器启动时覆盖夹具以包含模块。

您可以让它返回要导入的模块名称列表,可以是任务模块、注册信号的模块等。

示例:

@pytest.fixture(scope='session')
def celery_includes():
    return [
        'proj.tests.tasks',
        'proj.tests.celery_signal_handlers',
    ]
celery_worker_pool - 覆盖用于嵌入式工作器的池。

您可以覆盖 fixture 来配置用于嵌入式 worker 的执行池。

示例:

@pytest.fixture(scope='session')
def celery_worker_pool():
    return 'prefork'

警告

您不能使用 gevent/eventlet 池,除非您的整个测试套件在启用monkeypatches 的情况下运行。


celery_session_worker - 贯穿整个会话的嵌入式工作者。

这个夹具启动了一个在整个测试会话中都存在的工作器(它不会为每个测试启动/停止)。

示例:

# Add this to your conftest.py
@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'rpc',
    }

# Do this in your tests.
def test_add_task(celery_session_worker):
    assert add.delay(2, 2) == 4

警告

混合会话和临时工作人员可能是个坏主意...…


celery_session_app - 用于测试的 Celery 应用程序(会话范围)。

当其他会话范围的装置需要引用 Celery 应用程序实例时,可以使用它。


use_celery_app_trap - 在回退到默认应用程序时引发异常。

这是一个你可以在你的 conftest.py 中覆盖的装置,以启用“应用程序陷阱”:如果有东西试图访问 default 或 current_app,则会引发异常。

示例:

@pytest.fixture(scope='session')
def use_celery_app_trap():
    return True

如果测试想要访问默认应用程序,则必须使用 depends_on_current_app 固定装置对其进行标记:

@pytest.mark.usefixtures('depends_on_current_app')
def test_something():
    something()