使用 Celery 进行测试 — Python 文档
用芹菜测试
任务和单元测试
要在单元测试中测试任务行为,首选方法是模拟。
热切模式
:setting:`task_always_eager` 设置启用的热切模式根据定义不适合单元测试。
使用 Eager 模式进行测试时,您只是在测试工人中发生的情况的模拟,并且模拟与现实中发生的情况之间存在许多差异。
请注意,默认情况下,急切执行的任务不会将结果写入后端。 如果要启用此功能,请查看 :setting:`task_store_eager_result`。
Celery 任务很像 Web 视图,因为它应该只定义如何在被称为任务的上下文中执行操作。
这意味着任务最好只处理序列化、消息头、重试等,实际逻辑在别处实现。
假设我们有一个这样的任务:
from .models import Product
@app.task(bind=True)
def send_order(self, product_pk, quantity, price):
price = Decimal(price) # json serializes this to string.
# models are passed by id, not serialized.
product = Product.objects.get(product_pk)
try:
product.order(quantity, price)
except OperationalError as exc:
raise self.retry(exc=exc)
Note
:任务被绑定意味着任务的第一个参数将始终是任务实例(自身)。 这意味着您确实获得了一个 self 参数作为第一个参数,并且可以使用 Task 类的方法和属性。
您可以为此任务编写单元测试,使用模拟如下示例:
from pytest import raises
from celery.exceptions import Retry
# for python 2: use mock.patch from `pip install mock`.
from unittest.mock import patch
from proj.models import Product
from proj.tasks import send_order
class test_send_order:
@patch('proj.tasks.Product.order') # < patching Product in module above
def test_success(self, product_order):
product = Product.objects.create(
name='Foo',
)
send_order(product.pk, 3, Decimal(30.3))
product_order.assert_called_with(3, Decimal(30.3))
@patch('proj.tasks.Product.order')
@patch('proj.tasks.send_order.retry')
def test_failure(self, send_order_retry, product_order):
product = Product.objects.create(
name='Foo',
)
# Set a side effect on the patched methods
# so that they raise the errors we want.
send_order_retry.side_effect = Retry()
product_order.side_effect = OperationalError()
with raises(Retry):
send_order(product.pk, 3, Decimal(30.6))
pytest
4.0 版中的新功能。
Celery 还提供了一个 :pypi:`pytest` 插件,可以添加可以在集成(或单元)测试套件中使用的装置。
启用
Celery 最初以禁用状态发布插件,要启用它,您可以:
pip install celery[pytest]
pip install pytest-celery
- 或添加环境变量
PYTEST_PLUGINS=celery.contrib.pytest
- 或将
pytest_plugins = ("celery.contrib.pytest", )
添加到您的根 conftest.py
标记
celery - 设置测试应用配置。
celery
标记使您能够覆盖用于单个测试用例的配置:
@pytest.mark.celery(result_backend='redis://')
def test_something():
...
或者对于一个类中的所有测试用例:
@pytest.mark.celery(result_backend='redis://')
class test_something:
def test_one(self):
...
def test_two(self):
...
灯具
功能范围
celery_app - 用于测试的 Celery 应用程序。
该夹具返回一个可用于测试的 Celery 应用程序。
示例:
def test_create_task(celery_app, celery_worker):
@celery_app.task
def mul(x, y):
return x * y
assert mul.delay(4, 4).get(timeout=10) == 16
celery_worker - 嵌入实时工作者。
这个夹具启动了一个 Celery 工作实例,您可以将其用于集成测试。 worker 将在 单独的线程 中启动,并在测试返回后立即关闭。
默认情况下,fixture 将等待最多 10 秒让 worker 完成未完成的任务,如果超过时间限制,则会引发异常。 可以通过设置 celery_worker_parameters()
夹具返回的字典中的 shutdown_timeout
键来自定义超时。
示例:
# Put this in your conftest.py
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'amqp://',
'result_backend': 'redis://'
}
def test_add(celery_worker):
mytask.delay()
# If you wish to override some setting in one test cases
# only - you can use the ``celery`` mark:
@pytest.mark.celery(result_backend='rpc')
def test_other(celery_worker):
...
默认情况下,心跳是禁用的,这意味着测试工作人员不会为 worker-online
、worker-offline
和 worker-heartbeat
发送事件。 要启用心跳,请修改 celery_worker_parameters()
夹具:
# Put this in your conftest.py
@pytest.fixture(scope="session")
def celery_worker_parameters():
return {"without_heartbeat": False}
...
会话范围
celery_config - 覆盖设置 Celery 测试应用程序配置。
您可以重新定义这个装置来配置测试 Celery 应用程序。
然后您的灯具返回的配置将用于配置 celery_app()
和 celery_session_app()
灯具。
示例:
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'amqp://',
'result_backend': 'rpc',
}
celery_parameters - 覆盖设置 Celery 测试应用程序参数。
你可以重新定义这个夹具来改变测试 Celery 应用程序的 __init__
参数。 与 celery_config()
相比,这些在实例化 Celery
时直接传递给。
然后您的灯具返回的配置将用于配置 celery_app()
和 celery_session_app()
灯具。
示例:
@pytest.fixture(scope='session')
def celery_parameters():
return {
'task_cls': my.package.MyCustomTaskClass,
'strict_typing': False,
}
celery_worker_parameters - 覆盖设置 Celery 工作器参数。
你可以重新定义这个夹具来改变测试 Celery 工人的 __init__
参数。 这些在实例化时直接传递给 WorkController
。
然后您的灯具返回的配置将用于配置 celery_worker()
和 celery_session_worker()
灯具。
示例:
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
'queues': ('high-prio', 'low-prio'),
'exclude_queues': ('celery'),
}
celery_enable_logging - 覆盖以启用嵌入式工作人员的日志记录。
这是一个您可以覆盖以启用嵌入式工作人员日志记录的装置。
示例:
@pytest.fixture(scope='session')
def celery_enable_logging():
return True
celery_includes - 为嵌入式工作者添加额外的导入。
您可以在嵌入式工作器启动时覆盖夹具以包含模块。
您可以让它返回要导入的模块名称列表,可以是任务模块、注册信号的模块等。
示例:
@pytest.fixture(scope='session')
def celery_includes():
return [
'proj.tests.tasks',
'proj.tests.celery_signal_handlers',
]
celery_worker_pool - 覆盖用于嵌入式工作器的池。
您可以覆盖 fixture 来配置用于嵌入式 worker 的执行池。
示例:
@pytest.fixture(scope='session')
def celery_worker_pool():
return 'prefork'
警告
您不能使用 gevent/eventlet 池,除非您的整个测试套件在启用monkeypatches 的情况下运行。
celery_session_worker - 贯穿整个会话的嵌入式工作者。
这个夹具启动了一个在整个测试会话中都存在的工作器(它不会为每个测试启动/停止)。
示例:
# Add this to your conftest.py
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'amqp://',
'result_backend': 'rpc',
}
# Do this in your tests.
def test_add_task(celery_session_worker):
assert add.delay(2, 2) == 4
警告
混合会话和临时工作人员可能是个坏主意...…
celery_session_app - 用于测试的 Celery 应用程序(会话范围)。
当其他会话范围的装置需要引用 Celery 应用程序实例时,可以使用它。
use_celery_app_trap - 在回退到默认应用程序时引发异常。
这是一个你可以在你的 conftest.py
中覆盖的装置,以启用“应用程序陷阱”:如果有东西试图访问 default 或 current_app,则会引发异常。
示例:
@pytest.fixture(scope='session')
def use_celery_app_trap():
return True
如果测试想要访问默认应用程序,则必须使用 depends_on_current_app
固定装置对其进行标记:
@pytest.mark.usefixtures('depends_on_current_app')
def test_something():
something()