配置和默认值 — Python 文档

来自菜鸟教程
Celery/docs/latest/userguide/configuration
跳转至:导航、​搜索

配置和默认值

本文档描述了可用的配置选项。

如果您使用默认加载器,则必须创建 celeryconfig.py 模块并确保它在 Python 路径上可用。

示例配置文件

这是帮助您入门的示例配置文件。 它应该包含运行基本 Celery 设置所需的所有内容。

## Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'

# List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)

## Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

新的小写设置

4.0 版引入了新的小写设置和设置组织。

除了小写名称之外,以前版本之间的主要区别是一些前缀的重命名,例如 celery_beat_beat_celeryd_worker_,并且大多数顶级 celery_ 设置已移动到新的 task_ 前缀中。

警告

在 Celery 6.0 之前,Celery 仍然可以读取旧的配置文件。 之后,将删除对旧配置文件的支持。 我们提供了 celery upgrade 命令来处理很多情况(包括 Django)。

请尽快迁移到新的配置方案。


设置名称 用。。。来代替
CELERY_ACCEPT_CONTENT :设置:`接受内容`
CELERY_ENABLE_UTC :设置:`启用_utc`
CELERY_IMPORTS :设置:`进口`
CELERY_INCLUDE :设置:`包括`
CELERY_TIMEZONE :设置:`时区`
CELERYBEAT_MAX_LOOP_INTERVAL :设置:`beat_max_loop_interval`
CELERYBEAT_SCHEDULE :设置:`beat_schedule`
CELERYBEAT_SCHEDULER :设置:`beat_scheduler`
CELERYBEAT_SCHEDULE_FILENAME :设置:`beat_schedule_filename`
CELERYBEAT_SYNC_EVERY :设置:`beat_sync_every`
BROKER_URL :设置:`broker_url`
BROKER_TRANSPORT :设置:`broker_transport`
BROKER_TRANSPORT_OPTIONS :设置:`broker_transport_options`
BROKER_CONNECTION_TIMEOUT :设置:`broker_connection_timeout`
BROKER_CONNECTION_RETRY :设置:`broker_connection_retry`
BROKER_CONNECTION_MAX_RETRIES :设置:`broker_connection_max_retries`
BROKER_FAILOVER_STRATEGY :设置:`broker_failover_strategy`
BROKER_HEARTBEAT :设置:`broker_heartbeat`
BROKER_LOGIN_METHOD :设置:`broker_login_method`
BROKER_POOL_LIMIT :设置:`broker_pool_limit`
BROKER_USE_SSL :设置:`broker_use_ssl`
CELERY_CACHE_BACKEND :设置:`cache_backend`
CELERY_CACHE_BACKEND_OPTIONS :设置:`cache_backend_options`
CASSANDRA_COLUMN_FAMILY :设置:`cassandra_table`
CASSANDRA_ENTRY_TTL :设置:`cassandra_entry_ttl`
CASSANDRA_KEYSPACE :设置:`cassandra_keyspace`
CASSANDRA_PORT :设置:`cassandra_port`
CASSANDRA_READ_CONSISTENCY :设置:`cassandra_read_consistency`
CASSANDRA_SERVERS :设置:`cassandra_servers`
CASSANDRA_WRITE_CONSISTENCY :设置:`cassandra_write_consistency`
CASSANDRA_OPTIONS :设置:`cassandra_options`
S3_ACCESS_KEY_ID :设置:`s3_access_key_id`
S3_SECRET_ACCESS_KEY :设置:`s3_secret_access_key`
S3_BUCKET :设置:`s3_bucket`
S3_BASE_PATH :设置:`s3_base_path`
S3_ENDPOINT_URL :设置:`s3_endpoint_url`
S3_REGION :设置:`s3_region`
CELERY_COUCHBASE_BACKEND_SETTINGS :setting:`couchbase_backend_settings`
CELERY_ARANGODB_BACKEND_SETTINGS :setting:`arangodb_backend_settings`
CELERY_MONGODB_BACKEND_SETTINGS :设置:`mongodb_backend_settings`
CELERY_EVENT_QUEUE_EXPIRES :设置:`event_queue_expires`
CELERY_EVENT_QUEUE_TTL :设置:`event_queue_ttl`
CELERY_EVENT_QUEUE_PREFIX :设置:`event_queue_prefix`
CELERY_EVENT_SERIALIZER :设置:`event_serializer`
CELERY_REDIS_DB :设置:`redis_db`
CELERY_REDIS_HOST :设置:`redis_host`
CELERY_REDIS_MAX_CONNECTIONS :设置:`redis_max_connections`
CELERY_REDIS_USERNAME :setting:`redis_username`
CELERY_REDIS_PASSWORD :设置:`redis_password`
CELERY_REDIS_PORT :设置:`redis_port`
CELERY_REDIS_BACKEND_USE_SSL :设置:`redis_backend_use_ssl`
CELERY_RESULT_BACKEND :设置:`result_backend`
CELERY_MAX_CACHED_RESULTS :设置:`result_cache_max`
CELERY_MESSAGE_COMPRESSION :设置:`result_compression`
CELERY_RESULT_EXCHANGE :设置:`result_exchange`
CELERY_RESULT_EXCHANGE_TYPE :设置:`result_exchange_type`
CELERY_RESULT_EXPIRES :设置:`result_expires`
CELERY_RESULT_PERSISTENT :设置:`result_persistent`
CELERY_RESULT_SERIALIZER :设置:`result_serializer`
CELERY_RESULT_DBURI 改用 :setting:`result_backend`
CELERY_RESULT_ENGINE_OPTIONS :设置:`database_engine_options`
[...]_DB_SHORT_LIVED_SESSIONS :设置:`database_short_lived_sessions`
CELERY_RESULT_DB_TABLE_NAMES :设置:`database_db_names`
CELERY_SECURITY_CERTIFICATE :设置:`安全证书`
CELERY_SECURITY_CERT_STORE :设置:`security_cert_store`
CELERY_SECURITY_KEY :设置:`security_key`
CELERY_ACKS_LATE :设置:`task_acks_late`
CELERY_ACKS_ON_FAILURE_OR_TIMEOUT :设置:`task_acks_on_failure_or_timeout`
CELERY_ALWAYS_EAGER :设置:`task_always_eager`
CELERY_ANNOTATIONS :设置:`task_annotations`
CELERY_COMPRESSION :设置:`task_compression`
CELERY_CREATE_MISSING_QUEUES :设置:`task_create_missing_queues`
CELERY_DEFAULT_DELIVERY_MODE :设置:`task_default_delivery_mode`
CELERY_DEFAULT_EXCHANGE :设置:`task_default_exchange`
CELERY_DEFAULT_EXCHANGE_TYPE :设置:`task_default_exchange_type`
CELERY_DEFAULT_QUEUE :设置:`task_default_queue`
CELERY_DEFAULT_RATE_LIMIT :设置:`task_default_rate_limit`
CELERY_DEFAULT_ROUTING_KEY :设置:`task_default_routing_key`
CELERY_EAGER_PROPAGATES :设置:`task_eager_propagates`
CELERY_IGNORE_RESULT :设置:`task_ignore_result`
CELERY_PUBLISH_RETRY :设置:`task_publish_retry`
CELERY_PUBLISH_RETRY_POLICY :设置:`task_publish_retry_policy`
CELERY_QUEUES :设置:`task_queues`
CELERY_ROUTES :设置:`task_routes`
CELERY_SEND_SENT_EVENT :设置:`task_send_sent_event`
CELERY_SERIALIZER :设置:`task_serializer`
CELERYD_SOFT_TIME_LIMIT :设置:`task_soft_time_limit`
CELERY_TASK_TRACK_STARTED :设置:`task_track_started`
CELERY_TASK_REJECT_ON_WORKER_LOST :设置:`task_reject_on_worker_lost`
CELERYD_TIME_LIMIT :设置:`task_time_limit`
CELERYD_AGENT :设置:`worker_agent`
CELERYD_AUTOSCALER :设置:`worker_autoscaler`
CELERYD_CONCURRENCY :设置:`worker_concurrency`
CELERYD_CONSUMER :设置:`worker_consumer`
CELERY_WORKER_DIRECT :设置:`worker_direct`
CELERY_DISABLE_RATE_LIMITS :设置:`worker_disable_rate_limits`
CELERY_ENABLE_REMOTE_CONTROL :设置:`worker_enable_remote_control`
CELERYD_HIJACK_ROOT_LOGGER :设置:`worker_hijack_root_logger`
CELERYD_LOG_COLOR :设置:`worker_log_color`
CELERYD_LOG_FORMAT :设置:`worker_log_format`
CELERYD_WORKER_LOST_WAIT :设置:`worker_lost_wait`
CELERYD_MAX_TASKS_PER_CHILD :设置:`worker_max_tasks_per_child`
CELERYD_POOL :设置:`worker_pool`
CELERYD_POOL_PUTLOCKS :设置:`worker_pool_putlocks`
CELERYD_POOL_RESTARTS :设置:`worker_pool_restarts`
CELERYD_PREFETCH_MULTIPLIER :设置:`worker_prefetch_multiplier`
CELERYD_REDIRECT_STDOUTS :设置:`worker_redirect_stdouts`
CELERYD_REDIRECT_STDOUTS_LEVEL :设置:`worker_redirect_stdouts_level`
CELERY_SEND_EVENTS :设置:`worker_send_task_events`
CELERYD_STATE_DB :设置:`worker_state_db`
CELERYD_TASK_LOG_FORMAT :设置:`worker_task_log_format`
CELERYD_TIMER :设置:`worker_timer`
CELERYD_TIMER_PRECISION :设置:`worker_timer_precision`


配置指令

一般设置

accept_content

默认值:{'json'}(集合、列表或元组)。

允许的内容类型/序列化程序的白名单。

如果接收到不在此列表中的消息,则该消息将被丢弃并显示错误。

默认只启用 json,但可以添加任何内容类型,包括 pickle 和 yaml; 在这种情况下,请确保不受信任的各方无法访问您的经纪人。 有关更多信息,请参阅 安全性

示例:

# using serializer name
accept_content = ['json']

# or the actual content-type (MIME)
accept_content = ['application/json']

result_accept_content

默认值:None(可以设置、列表或元组)。

4.3 版中的新功能。


允许结果后端的内容类型/序列化器的白名单。

如果接收到不在此列表中的消息,则该消息将被丢弃并显示错误。

默认情况下,它与 accept_content 是相同的序列化程序。 但是,可以为结果后端的接受内容指定不同的序列化程序。 如果使用签名的消息传递并且结果在结果后端中未签名存储,则通常需要这样做。 有关更多信息,请参阅 安全性

示例:

# using serializer name
result_accept_content = ['json']

# or the actual content-type (MIME)
result_accept_content = ['application/json']

时间和日期设置

enable_utc

2.5 版中的新功能。


默认值:自 3.0 版起默认启用。

如果启用了消息中的日期和时间,则将转换为使用 UTC 时区。

请注意,运行低于 2.5 的 Celery 版本的工作人员将为所有消息假定本地时区,因此只有在所有工作人员都已升级后才启用。


timezone

2.5 版中的新功能。


默认值:"UTC"

将 Celery 配置为使用自定义时区。 时区值可以是 :pypi:`pytz` 库支持的任何时区。

如果未设置,则使用 UTC 时区。 为了向后兼容,还有一个 :setting:`enable_utc` 设置,当它设置为 false 时,将使用系统本地时区。


任务设置

task_annotations

2.5 版中的新功能。


默认值:None

此设置可用于重写配置中的任何任务属性。 该设置可以是一个字典,也可以是一个注释对象列表,用于过滤任务并返回要更改的属性映射。

这将更改 tasks.add 任务的 rate_limit 属性:

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

或更改所有任务的相同内容:

task_annotations = {'*': {'rate_limit': '10/s'}}

您也可以更改方法,例如 on_failure 处理程序:

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
    print('Oh no! Task failed: {0!r}'.format(exc))

task_annotations = {'*': {'on_failure': my_on_failure}}

如果您需要更大的灵活性,那么您可以使用对象而不是 dict 来选择要注释的任务:

class MyAnnotate:

    def annotate(self, task):
        if task.name.startswith('tasks.'):
            return {'rate_limit': '10/s'}

task_annotations = (MyAnnotate(), {other,})

task_compression

默认值:None

用于任务消息的默认压缩。 可以是 gzipbzip2(如果可用)或在 Kombu 压缩注册表中注册的任何自定义压缩方案。

默认是发送未压缩的消息。


task_protocol

默认值:2(自 4.0 起)。

设置用于发送任务的默认任务消息协议版本。 支持协议:1 和 2。

3.1.24 和 4.x+ 支持协议 2。


task_serializer

默认值:"json"(自 4.0 起,更早版本:pickle)。

标识要使用的默认序列化方法的字符串。 可以是 json(默认)、pickle、yaml、msgpack,或者任何已经注册到 kombu.serialization.registry

也可以看看

序列化程序


task_publish_retry

2.2 版中的新功能。


默认值:启用。

决定是否在连接丢失或其他连接错误的情况下重试发布任务消息。 另见 :setting:`task_publish_retry_policy`


task_publish_retry_policy

2.2 版中的新功能。


默认值:参见消息发送重试

定义在连接丢失或其他连接错误的情况下重试发布任务消息时的默认策略。


任务执行设置

task_always_eager

默认值:禁用。

如果这是True,所有任务都会通过阻塞本地执行,直到任务返回。 apply_async()Task.delay() 将返回一个 EagerResult 实例,该实例模拟 AsyncResult 的 API 和行为,但结果已被评估。

也就是说,任务将在本地执行,而不是发送到队列中。


task_eager_propagates

默认值:禁用。

如果这是 True,急切执行的任务(由 task.apply() 应用,或当 :setting:`task_always_eager` 设置启用时),将传播例外。

这与始终运行 apply()throw=True 相同。


task_store_eager_result

5.1 版中的新功能。


默认值:禁用。

如果这是 True:setting:`task_always_eager`True:setting:`task_ignore_result`False,急切执行任务的结果将保存到后端。

默认情况下,即使 :setting:`task_always_eager` 设置为 True:setting:`task_ignore_result` 设置为 False,结果也会不被拯救。


task_remote_tracebacks

默认值:禁用。

如果启用,任务结果将在重新引发任务错误时包括工作人员堆栈。

这需要 :pypi:`tblib` 库,可以使用 pip 安装:

$ pip install celery[tblib]

有关组合多个扩展要求的信息,请参阅 捆绑包


task_ignore_result

默认值:禁用。

是否存储任务返回值(墓碑)。 如果你还想存储错误,只是不成功返回值,你可以设置:setting:`task_store_errors_even_if_ignored`


task_store_errors_even_if_ignored

默认值:禁用。

如果设置,即使 Task.ignore_result 开启,worker 也会将所有任务错误存储在结果存储中。


task_track_started

默认值:禁用。

如果 True 任务将在工作人员执行任务时报告其状态为“已启动”。 默认值为 False,因为正常行为是不报告该粒度级别。 任务要么是挂起的,要么是已完成的,要么是等待重试。 当有长时间运行的任务并且需要报告当前正在运行的任务时,具有“开始”状态可能很有用。


task_time_limit

默认值:无时间限制。

以秒为单位的任务硬时间限制。 当超过这个值时,处理任务的工作人员将被杀死并替换为新的工作人员。


task_soft_time_limit

默认值:无软时间限制。

以秒为单位的任务软时间限制。

超过此值时将引发 @SoftTimeLimitExceeded 异常。 例如,任务可以在硬时间限制到来之前捕获它以进行清理:

from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()

task_acks_late

默认值:禁用。

延迟确认意味着任务消息将在 任务执行后 被确认,而不是在 之前的 (默认行为)。

task_acks_on_failure_or_timeout

默认值:启用

启用所有任务的消息后,即使它们失败或超时,也会得到确认。

配置此设置仅适用于在 之后确认的任务 已执行,并且仅当启用 :setting:`task_acks_late` 时。


task_reject_on_worker_lost

默认值:禁用。

即使启用了 :setting:`task_acks_late`,当执行任务的工作进程突然退出或收到信号时,工作进程也会确认任务(例如,:sig:`KILL`/[ X186X]:sig:`INT` 等)。

将此设置为 true 允许消息重新排队,以便任务将由同一工作人员或其他工作人员再次执行。

警告

启用此功能可能会导致消息循环; 确保你知道你在做什么。


task_default_rate_limit

默认值:无速率限制。

任务的全局默认速率限制。

此值用于没有自定义速率限制的任务

也可以看看

设置:worker_disable_rate_limits 设置可以禁用所有速率限制。


任务结果后端设置

result_backend

默认:默认不启用结果后端。

用于存储任务结果(墓碑)的后端。 可以是以下之一:


result_backend_always_retry

默认值:False

如果启用,后端将尝试重试可恢复异常的事件,而不是传播异常。 它将在 2 次重试之间使用指数退避睡眠时间。


result_backend_max_sleep_between_retries_ms

默认值:10000

这指定了两次后端操作重试之间的最大休眠时间。


result_backend_base_sleep_between_retries_ms

默认值:10

这指定了两次后端操作重试之间的基本休眠时间。


result_backend_max_retries

默认值:Inf

这是可恢复异常情况下的最大重试次数。


result_backend_transport_options

默认值:{}(空映射)。

传递给底层传输的附加选项的字典。

有关支持的选项(如果有),请参阅您的传输用户手册。

设置可见性超时的示例(Redis 和 SQS 传输支持):

result_backend_transport_options = {'visibility_timeout': 18000}  # 5 hours

result_serializer

默认值:json 自 4.0(更早版本:pickle)。

结果序列化格式。

有关支持的序列化格式的信息,请参阅 Serializers


result_compression

默认值:无压缩。

用于任务结果的可选压缩方法。 支持与 :setting:`task_compression` 设置相同的选项。


result_extended

默认值:False

启用要写入后端的扩展任务结果属性(名称、参数、kwargs、worker、重试、队列、delivery_info)。


result_expires

默认:1 天后过期。

存储的任务墓碑将被删除的时间(以秒为单位,或 timedelta 对象)。

假设启用了celery beat,内置周期任务将删除此时间之后的结果(celery.backend_cleanup)。 该任务每天凌晨 4 点运行。

None 或 0 的值表示结果永远不会过期(取决于后端规范)。

笔记

目前,这只适用于 AMQP、数据库、缓存、Couchbase 和 Redis 后端。

使用数据库后端时,必须运行 celery beat 才能使结果过期。


result_cache_max

默认:默认禁用。

启用客户端缓存结果。

这对于旧的已弃用的“amqp”后端很有用,因为一旦一个结果实例使用它,结果就不可用。

这是在驱逐旧结果之前要缓存的结果总数。 值为 0 或 None 表示没有限制,值为 -1 将禁用缓存。

默认禁用。


result_chord_join_timeout

默认值:3.0。

在和弦中加入组的结果时的超时时间(整数/浮点数)。


result_chord_retry_interval

默认值:1.0。

重试和弦任务的默认间隔。


override_backends

默认:默认禁用。

实现后端的类的路径。

允许覆盖后端实现。 如果您需要存储有关已执行任务的其他元数据、覆盖重试策略等,这会很有用。

示例:

override_backends = {"db": "custom_module.backend.class"}

数据库后台设置

数据库 URL 示例

要使用数据库后端,您必须使用连接 URL 和 db+ 前缀配置 :setting:`result_backend` 设置:

result_backend = 'db+scheme://user:password@host:port/dbname'

例子:

# sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'

# mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'

# postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'

# oracle
result_backend = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'

请参阅 Supported Databases 以获取支持的数据库表,以及 Connection String 以获取有关连接字符串的更多信息(这是 db+ 之后的 URI 部分)字首)。


database_engine_options

默认值:{}(空映射)。

要指定其他 SQLAlchemy 数据库引擎选项,您可以使用 :setting:`database_engine_options` 设置:

# echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}

database_short_lived_sessions

默认:默认禁用。

默认情况下禁用短期会话。 如果启用,它们会大大降低性能,尤其是在处理大量任务的系统上。 此选项对于由于缓存数据库连接因不活动而失效而遇到错误的低流量工作人员很有用。 例如,像 (OperationalError) (2006, 'MySQL server has go away') 这样的间歇性错误可以通过启用短期会话来修复。 此选项仅影响数据库后端。


database_table_schemas

默认值:{}(空映射)。

当 SQLAlchemy 被配置为结果后端时,Celery 会自动创建两个表来存储任务的结果元数据。 此设置允许您自定义表的架构:

# use custom schema for the database result backend.
database_table_schemas = {
    'task': 'celery',
    'group': 'celery',
}

database_table_names

默认值:{}(空映射)。

当 SQLAlchemy 被配置为结果后端时,Celery 会自动创建两个表来存储任务的结果元数据。 此设置允许您自定义表名:

# use custom table names for the database result backend.
database_table_names = {
    'task': 'myapp_taskmeta',
    'group': 'myapp_groupmeta',
}

RPC 后端设置

result_persistent

默认:默认禁用(瞬态消息)。

如果设置为 True,结果消息将是持久的。 这意味着消息在代理重启后不会丢失。


示例配置

result_backend = 'rpc://'
result_persistent = False

请注意:如果任务墓碑太,使用这个后端可能会触发celery.backends.rpc.BacklogLimitExceeded的提升。

例如

for i in range(10000):
    r = debug_task.delay()

print(r.state)  # this would raise celery.backends.rpc.BacklogLimitExceeded

缓存后端设置

笔记

缓存后端支持 [X31X]:pypi:`pylibmc` 和 :pypi:`python-memcached` 库。 后者仅在未安装 :pypi:`pylibmc` 时使用。


使用单个 Memcached 服务器:

result_backend = 'cache+memcached://127.0.0.1:11211/'

使用多个 Memcached 服务器:

result_backend = """
    cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()

“内存”后端仅将缓存存储在内存中:

result_backend = 'cache'
cache_backend = 'memory'

cache_backend_options

默认值:{}(空映射)。

您可以使用 :setting:`cache_backend_options` 设置来设置 :pypi:`pylibmc` 选项:

cache_backend_options = {
    'binary': True,
    'behaviors': {'tcp_nodelay': True},
}

cache_backend

此设置不再用于 celery 的内置后端,因为现在可以直接在 :setting:`result_backend` 设置中指定缓存后端。

笔记

django-celery-results - 使用 Django ORM/Cache 作为结果后端 库使用 cache_backend 来选择 django 缓存。


MongoDB 后端设置

笔记

MongoDB 后端需要 pymongo 库:http://github.com/mongodb/mongo-python-driver/tree/master


mongodb_backend_settings

这是一个支持以下键的字典:

  • 数据库

    要连接的数据库名称。 默认为 celery

  • taskmeta_collection

    用于存储任务元数据的集合名称。 默认为 celery_taskmeta

  • 最大池大小

    作为 max_pool_size 传递给 PyMongo 的 Connection 或 MongoClient 构造函数。 它是在给定时间对 MongoDB 保持打开的最大 TCP 连接数。 如果打开的连接数超过 max_pool_size,则在释放套接字时将关闭套接字。 默认为 10。

  • 选项

    传递给 mongodb 连接构造函数的其他关键字参数。 请参阅 pymongo 文档以查看支持的参数列表。


示例配置

result_backend = 'mongodb://localhost:27017/'
mongodb_backend_settings = {
    'database': 'mydb',
    'taskmeta_collection': 'my_taskmeta_collection',
}

Redis 后端设置

配置后端 URL

笔记

Redis 后端需要 :pypi:`redis` 库。

要安装此软件包,请使用 pip

$ pip install celery[redis]

有关组合多个扩展要求的信息,请参阅 捆绑包


此后端需要将 :setting:`result_backend` 设置设置为 Redis 或 Redis over TLS URL:

result_backend = 'redis://username:password@host:port/db'

例如:

result_backend = 'redis://localhost/0'

是相同的:

result_backend = 'redis://'

使用 rediss:// 协议通过 TLS 连接到 redis:

result_backend = 'rediss://username:password@host:port/db?ssl_cert_reqs=required'

请注意,ssl_cert_reqs 字符串应该是 requiredoptionalnone 之一(不过,为了向后兼容,字符串也可以是 [ X151X]、CERT_OPTIONALCERT_NONE)。

如果应使用 Unix 套接字连接,则 URL 需要采用以下格式:

result_backend = 'socket:///path/to/redis.sock'

URL 的字段定义如下:

  1. username

    5.1.0 版中的新功能。

    用于连接到数据库的用户名。

    请注意,这仅在 Redis>=6.0 和 py-redis>=3.4.0 安装时支持。

    如果您使用较旧的数据库版本或较旧的客户端版本,则可以省略用户名:

    result_backend = 'redis://:password@host:port/db'
  2. password

    用于连接数据库的密码。

  3. host

    Redis 服务器的主机名或 IP 地址(例如,localhost)。

  4. port

    Redis 服务器的端口。 默认值为 6379。

  5. db

    要使用的数据库编号。 默认值为 0。 db 可以包含一个可选的前导斜杠。

当使用 TLS 连接时(协议为 rediss://),您可以将 :setting:`broker_use_ssl` 中的所有值作为查询参数传入。 证书路径必须是 URL 编码的,并且需要 ssl_cert_reqs。 示例:

result_backend = 'rediss://:password@host:port/db?\
    ssl_cert_reqs=required\
    &ssl_ca_certs=%2Fvar%2Fssl%2Fmyca.pem\                  # /var/ssl/myca.pem
    &ssl_certfile=%2Fvar%2Fssl%2Fredis-server-cert.pem\     # /var/ssl/redis-server-cert.pem
    &ssl_keyfile=%2Fvar%2Fssl%2Fprivate%2Fworker-key.pem'   # /var/ssl/private/worker-key.pem

请注意,ssl_cert_reqs 字符串应该是 requiredoptionalnone 之一(不过,为了向后兼容,字符串也可以是 [ X151X]、CERT_OPTIONALCERT_NONE)。

5.1.0 版中的新功能。


redis_backend_health_check_interval

默认值:未配置

Redis 后端支持健康检查。 此值必须设置为整数,其值为健康检查之间的秒数。 如果在健康检查过程中遇到 ConnectionError 或 TimeoutError,将重新建立连接并且命令只重试一次。


redis_backend_use_ssl

默认值:禁用。

Redis 后端支持 SSL。 这个值必须以字典的形式设置。 有效的键值对与 :setting:`broker_use_ssl` 下的 redis 小节中提到的相同。


redis_max_connections

默认值:无限制。

Redis 连接池中用于发送和检索结果的最大可用连接数。

警告

如果并发连接数超过最大值,Redis 将引发 ConnectionError。


redis_socket_connect_timeout

版本 4.0.1 中的新功能。


默认值:None

从结果后端连接到 Redis 的套接字超时(以秒为单位)(整数/浮点数)


redis_socket_timeout

默认值:120.0 秒。

以秒为单位对 Redis 服务器进行读/写操作的套接字超时(int/float),由 redis 结果后端使用。


redis_retry_on_timeout

4.4.1 版中的新功能。


默认值:False

重试对 Redis 服务器的 TimeoutError 读/写操作,由 redis 结果后端使用。 如果通过 unix socket 使用 Redis 连接,则不应设置此变量。


redis_socket_keepalive

4.4.1 版中的新功能。


默认值:False

套接字 TCP keepalive 以保持与 Redis 服务器的连接健康,由 redis 结果后端使用。


Cassandra 后端设置

笔记

这个 Cassandra 后端驱动程序需要 :pypi:`cassandra-driver`

要安装,请使用 pip

$ pip install celery[cassandra]

有关组合多个扩展要求的信息,请参阅 捆绑包


此后端需要设置以下配置指令。

cassandra_servers

默认值:[](空列表)。

host Cassandra 服务器列表。 例如:

cassandra_servers = ['localhost']

cassandra_port

默认值:9042。

用于联系 Cassandra 服务器的端口。


cassandra_keyspace

默认值:无。

存储结果的键空间。 例如:

cassandra_keyspace = 'tasks_keyspace'

cassandra_table

默认值:无。

存储结果的表(列族)。 例如:

cassandra_table = 'tasks'

cassandra_read_consistency

默认值:无。

使用的读取一致性。 值可以是 ONETWOTHREEQUORUMALLLOCAL_QUORUMEACH_QUORUM ]、LOCAL_ONE


cassandra_write_consistency

默认值:无。

使用的写一致性。 值可以是 ONETWOTHREEQUORUMALLLOCAL_QUORUMEACH_QUORUM ]、LOCAL_ONE


cassandra_entry_ttl

默认值:无。

状态条目的生存时间。 它们将在添加后的几秒钟后过期并被删除。 None(默认)值意味着它们永远不会过期。


cassandra_auth_provider

默认值:None

要使用的 cassandra.auth 模块中的 AuthProvider 类。 值可以是 PlainTextAuthProviderSaslAuthProvider


cassandra_auth_kwargs

默认值:{}(空映射)。

传递给身份验证提供程序的命名参数。 例如:

cassandra_auth_kwargs = {
    username: 'cassandra',
    password: 'cassandra'
}

cassandra_options

默认值:{}(空映射)。

传递给 cassandra.cluster 类的命名参数。

cassandra_options = {
    'cql_version': '3.2.1'
    'protocol_version': 3
}

示例配置

cassandra_servers = ['localhost']
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'ONE'
cassandra_write_consistency = 'ONE'
cassandra_entry_ttl = 86400

S3 后端设置

笔记

这个 s3 后端驱动程序需要 :pypi:`s3`

要安装,请使用 s3

$ pip install celery[s3]

有关组合多个扩展要求的信息,请参阅 捆绑包


此后端需要设置以下配置指令。

s3_access_key_id

默认值:无。

s3 访问密钥 ID。 例如:

s3_access_key_id = 'acces_key_id'

s3_secret_access_key

默认值:无。

s3 秘密访问密钥。 例如:

s3_secret_access_key = 'acces_secret_access_key'

s3_bucket

默认值:无。

s3 存储桶名称。 例如:

s3_bucket = 'bucket_name'

s3_base_path

默认值:无。

s3 存储桶中用于存储结果键的基本路径。 例如:

s3_base_path = '/prefix'

s3_endpoint_url

默认值:无。

自定义 s3 端点 URL。 使用它连接到自定义的自托管 s3 兼容后端(Ceph、Scality...)。 例如:

s3_endpoint_url = 'https://.s3.custom.url'

s3_region

默认值:无。

s3 aws 区域。 例如:

s3_region = 'us-east-1'

示例配置

s3_access_key_id = 's3-access-key-id'
s3_secret_access_key = 's3-secret-access-key'
s3_bucket = 'mybucket'
s3_base_path = '/celery_result_backend'
s3_endpoint_url = 'https://endpoint_url'

Azure Block Blob 后端设置

要将 AzureBlockBlob 用作结果后端,您只需使用正确的 URL 配置 :setting:`result_backend` 设置。

所需的 URL 格式为 azureblockblob:// 后跟存储连接字符串。 可以在 Azure 门户的存储帐户资源的 Access Keys 窗格中找到存储连接字符串。

示例配置

result_backend = 'azureblockblob://DefaultEndpointsProtocol=https;AccountName=somename;AccountKey=Lou...bzg==;EndpointSuffix=core.windows.net'

azureblockblob_container_name

默认:芹菜。

存储结果的存储容器的名称。


azureblockblob_base_path

5.1 版中的新功能。


默认值:无。

存储容器中用于存储结果键的基本路径。 例如:

azureblockblob_base_path = 'prefix/'

azureblockblob_retry_initial_backoff_sec

默认值:2。

第一次重试的初始退避间隔(以秒为单位)。 使用指数策略尝试后续重试。


azureblockblob_retry_increment_base

默认值:2。


azureblockblob_retry_max_attempts

默认值:3。

最大重试次数。


azureblockblob_connection_timeout

默认值:20。

建立 azure 块 blob 连接的超时时间(以秒为单位)。


azureblockblob_read_timeout

默认值:120。

读取天蓝色块 blob 的超时时间(以秒为单位)。


Elasticsearch 后端设置

要使用 Elasticsearch 作为结果后端,您只需使用正确的 URL 配置 :setting:`result_backend` 设置。

示例配置

result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'

elasticsearch_retry_on_timeout

默认值:False

超时是否应该在不同的节点上触发重试?


elasticsearch_max_retries

默认值:3。

传播异常之前的最大重试次数。


elasticsearch_timeout

默认值:10.0 秒。

全局超时,由 elasticsearch 结果后端使用。


elasticsearch_save_meta_as_text

默认值:True

应该将元保存为文本还是本机 json。 结果始终序列化为文本。


AWS DynamoDB 后端设置

笔记

Dynamodb 后端需要 :pypi:`boto3` 库。

要安装此软件包,请使用 pip

$ pip install celery[dynamodb]

有关组合多个扩展要求的信息,请参阅 捆绑包


警告

Dynamodb 后端与定义了排序键的表不兼容。

如果要基于分区键以外的其他内容查询结果表,请改为定义全局二级索引 (GSI)。


此后端需要将 :setting:`result_backend` 设置设置为 DynamoDB URL:

result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'

例如,指定 AWS 区域和表名:

result_backend = 'dynamodb://@us-east-1/celery_results'

或从环境中检索 AWS 配置参数,使用默认表名 (celery) 并指定读写预配置吞吐量:

result_backend = 'dynamodb://@/?read=5&write=5'

或使用 DynamoDB 可下载版本 本地 :

result_backend = 'dynamodb://@localhost:8000'

或使用可下载版本或其他服务,并在任何主机上部署符合规范的 API:

result_backend = 'dynamodb://@us-east-1'
dynamodb_endpoint_url = 'http://192.168.0.40:8000'

result_backend中DynamoDB URL的字段定义如下:

  1. aws_access_key_id & aws_secret_access_key

    用于访问 AWS API 资源的凭证。 这些也可以通过来自各种来源的 :pypi:`boto3` 库解决,如 此处 所述。

  2. region

    AWS 区域,例如 us-east-1localhost 用于 可下载版本 。 有关定义选项,请参阅 :pypi:`boto3`文档

  3. port

    本地 DynamoDB 实例的侦听端口(如果您使用的是可下载版本)。 如果没有将region参数指定为localhost,则设置该参数对无效

  4. table

    要使用的表名。 默认值为 celery。 有关允许的字符和长度的信息,请参阅 DynamoDB 命名规则

  5. read & write

    创建的 DynamoDB 表的读取和写入容量单位。 读取和写入的默认值为 1。 更多细节可以在 预配置吞吐量文档 中找到。

  6. ttl_seconds

    结果过期前的生存时间(以秒为单位)。 默认值是不使结果过期,同时保持 DynamoDB 表的生存时间设置不变。 如果 ttl_seconds 设置为正值,则结果将在指定的秒数后过期。 将 ttl_seconds 设置为负值意味着不会使结果过期,并且还会主动禁用 DynamoDB 表的生存时间设置。 请注意,尝试快速连续多次更改表的生存时间设置将导致限制错误。 更多细节可以在DynamoDB TTL文档中找到


IronCache 后端设置

笔记

IronCache 后端需要 :pypi:`iron_celery` 库:

要安装此软件包,请使用 pip

$ pip install iron_celery

IronCache 通过 :setting:`result_backend` 中提供的 URL 配置,例如:

result_backend = 'ironcache://project_id:token@'

或者更改缓存名称:

ironcache:://project_id:token@/awesomecache

更多信息参见:https://github.com/iron-io/iron_celery


Couchbase 后端设置

笔记

Couchbase 后端需要 :pypi:`couchbase` 库。

要安装此软件包,请使用 pip

$ pip install celery[couchbase]

有关如何组合多个扩展要求的说明,请参阅 Bundles


这个后端可以通过设置为 Couchbase URL 的 :setting:`result_backend` 进行配置:

result_backend = 'couchbase://username:password@host:port/bucket'

couchbase_backend_settings

默认值:{}(空映射)。

这是一个支持以下键的字典:

  • host

    Couchbase 服务器的主机名。 默认为 localhost

  • port

    Couchbase 服务器正在侦听的端口。 默认为 8091

  • bucket

    Couchbase 服务器正在写入的默认存储桶。 默认为 default

  • username

    向 Couchbase 服务器验证身份的用户名(可选)。

  • password

    用于对 Couchbase 服务器进行身份验证的密码(可选)。


ArangoDB 后端设置

笔记

ArangoDB 后端需要 :pypi:`pyArango` 库。

要安装此软件包,请使用 pip

$ pip install celery[arangodb]

有关如何组合多个扩展要求的说明,请参阅 Bundles


这个后端可以通过设置为 ArangoDB URL 的 :setting:`result_backend` 进行配置:

result_backend = 'arangodb://username:password@host:port/database/collection'

arangodb_backend_settings

默认值:{}(空映射)。

这是一个支持以下键的字典:

  • host

    ArangoDB 服务器的主机名。 默认为 localhost

  • port

    ArangoDB 服务器正在侦听的端口。 默认为 8529

  • database

    ArangoDB 服务器中的默认数据库正在写入。 默认为 celery

  • collection

    ArangoDB 服务器数据库中的默认集合正在写入。 默认为 celery

  • username

    向 ArangoDB 服务器进行身份验证的用户名(可选)。

  • password

    向 ArangoDB 服务器进行身份验证的密码(可选)。

  • http_protocol

    ArangoDB 服务器连接中的 HTTP 协议。 默认为 http

  • verify

    创建 ArangoDB 连接时的 HTTPS 验证检查。 默认为 False


CosmosDB 后端设置(实验性)

要将 CosmosDB 用作结果后端,您只需使用正确的 URL 配置 :setting:`result_backend` 设置。

示例配置

result_backend = 'cosmosdbsql://:{InsertAccountPrimaryKeyHere}@{InsertAccountNameHere}.documents.azure.com'

cosmosdbsql_database_name

默认值:celerydb。

用于存储结果的数据库的名称。


cosmosdbsql_collection_name

默认值:芹菜。

存储结果的集合的名称。


cosmosdbsql_consistency_level

默认值:会话。

表示 Azure Cosmos DB 客户端操作支持的一致性级别。

按强度顺序排列的一致性级别是:Strong、BoundedStaleness、Session、ConsistentPrefix 和 Eventual。


cosmosdbsql_max_retry_attempts

默认值:9。

为请求执行的最大重试次数。


cosmosdbsql_max_retry_wait_time

默认值:30。

重试发生时等待请求的最大等待时间(以秒为单位)。


CouchDB 后端设置

笔记

CouchDB 后端需要 :pypi:`pycouchdb` 库:

要安装此 Couchbase 包,请使用 pip

$ pip install celery[couchdb]

有关组合多个扩展要求的信息,请参阅 捆绑包


这个后端可以通过设置为 CouchDB URL 的 :setting:`result_backend` 进行配置:

result_backend = 'couchdb://username:password@host:port/container'

URL 由以下部分组成:

  • username

    向 CouchDB 服务器进行身份验证的用户名(可选)。

  • password

    用于向 CouchDB 服务器进行身份验证的密码(可选)。

  • host

    CouchDB 服务器的主机名。 默认为 localhost

  • port

    CouchDB 服务器正在侦听的端口。 默认为 8091

  • container

    CouchDB 服务器写入的默认容器。 默认为 default


文件系统后端设置

可以使用文件 URL 配置此后端,例如:

CELERY_RESULT_BACKEND = 'file:///var/celery/results'

配置的目录需要被所有使用后端的服务器共享和写入。

如果您在单个系统上尝试 Celery,您可以简单地使用后端,无需任何进一步配置。 对于较大的集群,您可以使用 NFS、GlusterFS、CIFS、HDFS(使用 FUSE)或任何其他文件系统。


Consul K/V 存储后端设置

笔记

Consul 后端需要 :pypi:`python-consul2` 库:

要安装此软件包,请使用 pip

$ pip install python-consul2

Consul 后端可以使用 URL 进行配置,例如:

CELERY_RESULT_BACKEND = 'consul://localhost:8500/'

或者:

result_backend = 'consul://localhost:8500/'

后端会将结果作为单独的密钥存储在 Consul 的 K/V 存储中。 后端支持使用 Consul 中的 TTL 自动过期结果。 URL 的完整语法是:

consul://host:port[?one_client=1]

URL 由以下部分组成:

  • host

    Consul 服务器的主机名。

  • port

    Consul 服务器正在侦听的端口。

  • one_client

    默认情况下,为了正确性,后端为每个操作使用单独的客户端连接。 在极端负载的情况下,新连接的创建速度可能会导致在负载下来自 Consul 服务器的 HTTP 429“连接过多”错误响应。 处理此问题的推荐方法是使用 https://github.com/poppyred/python-consul2/pull/31 上的补丁在 python-consul2 中启用重试。

    或者,如果设置了 one_client,则所有操作都将使用单个客户端连接。 这应该会消除 HTTP 429 错误,但后端中的结果存储可能变得不可靠。


消息路由

task_queues

默认值:None(取自默认队列设置的队列)。

大多数用户不希望指定此设置,而应使用 自动路由工具

如果你真的想配置高级路由,这个设置应该是一个 kombu.Queue 对象列表,worker 将从中消费。

请注意,可以通过 -Q 选项覆盖此设置,或者可以使用 -X 选项排除此列表中的单个队列(按名称)。

另请参阅 基础知识 了解更多信息。

默认为celery的队列/交换/绑定键,交换类型为direct

另见 :setting:`task_routes`


task_routes

默认值:None

路由器列表或用于将任务路由到队列的单个路由器。 在决定任务的最终目的地时,会按顺序咨询路由器。

路由器可以指定为:

  • 带有签名的函数 (name, args, kwargs, options, task=None, **kwargs)
  • 提供路由器功能路径的字符串。
  • *; 包含路由器规范的字典:
    将转换为 celery.routes.MapRoute 实例。
  • *; (pattern, route) 元组列表:
    将转换为 celery.routes.MapRoute 实例。

例子:

task_routes = {
    'celery.ping': 'default',
    'mytasks.add': 'cpu-bound',
    'feed.tasks.*': 'feeds',                           # <-- glob pattern
    re.compile(r'(image|video)\.tasks\..*'): 'media',  # <-- regex
    'video.encode': {
        'queue': 'video',
        'exchange': 'media',
        'routing_key': 'media.video.encode',
    },
}

task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})

其中 myapp.tasks.route_task 可能是:

def route_task(self, name, args, kwargs, options, task=None, **kw):
        if task == 'celery.ping':
            return {'queue': 'default'}

route_task 可能返回一个字符串或一个字典。 一个字符串表示它是 :setting:`task_queues` 中的队列名称,一个 dict 表示它是一个自定义路由。

发送任务时,按顺序查询路由器。 第一个不返回 None 的路由器是要使用的路由。 然后将消息选项与找到的路由设置合并,其中任务的设置具有优先级。

如果 apply_async() 具有以下参数的示例:

Task.apply_async(immediate=False, exchange='video',
                 routing_key='video.compress')

一个路由器返回:

{'immediate': True, 'exchange': 'urgent'}

最终的消息选项将是:

immediate=False, exchange='video', routing_key='video.compress'

(以及 Task 类中定义的任何默认消息选项)

:setting:`task_routes` 中定义的值在合并两者时优先于 :setting:`task_queues` 中定义的值。

使用以下设置:

task_queues = {
    'cpubound': {
        'exchange': 'cpubound',
        'routing_key': 'cpubound',
    },
}

task_routes = {
    'tasks.add': {
        'queue': 'cpubound',
        'routing_key': 'tasks.add',
        'serializer': 'json',
    },
}

tasks.add 的最终路由选项将变为:

{'exchange': 'cpubound',
 'routing_key': 'tasks.add',
 'serializer': 'json'}

有关更多示例,请参阅 路由器


task_queue_max_priority

经纪人
兔MQ

默认值:None

参见 RabbitMQ 消息优先级


task_default_priority

经纪人
RabbitMQ、Redis

默认值:None

参见 RabbitMQ 消息优先级


task_inherit_parent_priority

经纪人
兔MQ

默认值:False

如果启用,子任务将继承父任务的优先级。

# The last task in chain will also have priority set to 5.
chain = celery.chain(add.s(2) | add.s(2).set(priority=5) | add.s(3))

当使用 delay 或 apply_async 从父任务调用子任务时,优先级继承也有效。

参见 RabbitMQ 消息优先级


worker_direct

默认值:禁用。

此选项启用以便每个工作人员都有一个专用队列,以便可以将任务路由到特定工作人员。

每个工作程序的队列名称是根据工作程序主机名和 .dq 后缀自动生成的,使用 C.dq 交换。

例如,节点名称为 w1@example.com 的工作线程的队列名称变为:

w1@example.com.dq

然后你可以通过指定主机名作为路由键和 C.dq 交换来将任务路由到任务:

task_routes = {
    'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'}
}

task_create_missing_queues

默认值:启用。

如果启用(默认),任何未在 :setting:`task_queues` 中定义的队列都将被自动创建。 参见自动路由


task_default_queue

默认值:"celery"

如果消息没有路由或未指定自定义队列,则 .apply_async 使用的默认队列的名称。

该队列必须列在 :setting:`task_queues` 中。 如果 :setting:`task_queues` 没有被指定,那么它会被自动创建,包含一个队列条目,这个名字被用作那个队列的名字。

也可以看看

更改默认队列的名称


task_default_exchange

默认值:使用为 :setting:`task_default_queue` 设置的值。

:setting:`task_queues` 设置中没有为密钥指定自定义交换时使用的默认交换的名称。


task_default_exchange_type

默认值:"direct"

:setting:`task_queues` 设置中没有为密钥指定自定义交换类型时使用的默认交换类型。


task_default_routing_key

默认值:使用为 :setting:`task_default_queue` 设置的值。

当没有为 :setting:`task_queues` 设置中的键指定自定义路由键时使用的默认路由键。


task_default_delivery_mode

默认值:"persistent"

可以是 transient(消息未写入磁盘)或 persistent(写入磁盘)。


经纪人设置

broker_url

默认值:"amqp://"

默认代理 URL。 这必须是以下形式的 URL:

transport://userid:password@hostname:port/virtual_host

只有scheme部分(transport://)是必须的,其余的都是可选的,默认为具体的transports默认值。

传输部分是要使用的代理实现,默认为 amqp,(如果安装或回退到 pyamqp,则使用 librabbitmq)。 还有其他选择,包括; redis://sqs://qpid://

该方案也可以是您自己的传输实现的完全合格路径:

broker_url = 'proj.transports.MyTransport://localhost'

也可以指定多个相同传输的代理 URL。 代理 URL 可以作为以分号分隔的单个字符串传入:

broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'

或者作为列表:

broker_url = [
    'transport://userid:password@localhost:port//',
    'transport://userid:password@hostname:port//'
]

然后将在 :setting:`broker_failover_strategy` 中使用代理。

有关更多信息,请参阅 Kombu 文档中的 kombu:connection-urls


broker_read_url / broker_write_url

默认值:取自 :setting:`broker_url`

可以配置这些设置,而不是 :setting:`broker_url` 为用于消费和生产的代理连接指定不同的连接参数。

示例:

broker_read_url = 'amqp://user:pass@broker.example.com:56721'
broker_write_url = 'amqp://user:pass@broker.example.com:56722'

这两个选项也可以指定为故障转移替代列表,请参阅 :setting:`broker_url` 了解更多信息。


broker_failover_strategy

默认值:"round-robin"

代理连接对象的默认故障转移策略。 如果提供,可以映射到 'kombu.connection.failover_strategies' 中的一个键,或者是对从提供的列表中生成单个项目的任何方法的引用。

示例:

# Random failover strategy
def random_failover_strategy(servers):
    it = list(servers)  # don't modify callers list
    shuffle = random.shuffle
    for _ in repeat(None):
        shuffle(it)
        yield it[0]

broker_failover_strategy = random_failover_strategy

broker_heartbeat

支持的传输
pyamqp

默认值:120.0(由服务器协商)。

注意:此值仅供工作人员使用,客户端目前不使用心跳。

仅使用 TCP/IP 并不总是能够及时检测连接丢失,因此 AMQP 定义了一种称为心跳的东西,客户端和代理都使用它来检测连接是否已关闭。

如果心跳值为 10 秒,则心跳将按照 :setting:`broker_heartbeat_checkrate` 设置指定的时间间隔进行监控(默认情况下设置为心跳值的两倍,因此对于10 秒内,每 5 秒检查一次心跳)。


broker_heartbeat_checkrate

支持的传输
pyamqp

默认值:2.0。

每隔一段时间,工作人员将监控代理没有错过太多心跳。 这个检查的速率是用:setting:`broker_heartbeat`值除以这个值来计算的,所以如果心跳是10.0,速率是默认的2.0,每5次检查一次秒(心跳发送速率的两倍)。


broker_use_ssl

支持的传输
pyamqpredis

默认值:禁用。

在代理连接和 SSL 设置上切换 SSL 使用。

此选项的有效值因传输而异。

pyamqp

如果 True 连接将使用 SSL 和默认 SSL 设置。 如果设置为字典,将根据指定的策略配置 SSL 连接。 使用的格式是 Python 的 ssl.wrap_socket() 选项。

请注意,SSL 套接字通常由代理在单独的端口上提供服务。

提供客户端证书并根据自定义证书颁发机构验证服务器证书的示例:

import ssl

broker_use_ssl = {
  'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}

警告

小心使用 broker_use_ssl=True。 您的默认配置可能根本不会验证服务器证书。 请阅读 Python ssl 模块安全注意事项


redis

该设置必须是具有以下键的字典:

  • *; ssl_cert_reqs(必需):SSLContext.verify_mode 值之一:
    *;* ssl.CERT_NONE
    • ssl.CERT_OPTIONAL
    • ssl.CERT_REQUIRED
  • ssl_ca_certs(可选):CA 证书的路径
  • ssl_certfile(可选):客户端证书的路径
  • ssl_keyfile(可选):客户端密钥的路径


broker_pool_limit

2.3 版中的新功能。


默认值:10。

连接池中可以打开的最大连接数。

自 2.5 版起默认启用该池,默认限制为 10 个连接。 这个数字可以根据使用连接的线程/绿色线程(eventlet/gevent)的数量进行调整。 例如,运行具有 1000 个使用代理连接的 greenlet 的 eventlet,可能会出现争用,您应该考虑增加限制。

如果设置为 None 或 0,连接池将被禁用,连接将在每次使用时建立和关闭。


broker_connection_timeout

默认值:4.0。

我们放弃与 AMQP 服务器建立连接之前的默认超时时间(以秒为单位)。 使用 gevent 时禁用此设置。

笔记

代理连接超时仅适用于尝试连接到代理的工作人员。 它不适用于生产者发送任务,请参阅 :setting:`broker_transport_options` 了解如何为这种情况提供超时。


broker_connection_retry

默认值:启用。

如果丢失,自动尝试重新建立与 AMQP 代理的连接。

每次重试都会增加重试之间的时间,并且在超过:setting:`broker_connection_max_retries` 之前不会耗尽。


broker_connection_max_retries

默认值:100。

在我们放弃重新建立与 AMQP 代理的连接之前的最大重试次数。

如果设置为 0None,我们将永远重试。


broker_login_method

默认值:"AMQPLAIN"

设置自定义 amqp 登录方法。


broker_transport_options

2.2 版中的新功能。


默认值:{}(空映射)。

传递给底层传输的附加选项的字典。

有关支持的选项(如果有),请参阅您的传输用户手册。

设置可见性超时的示例(Redis 和 SQS 传输支持):

broker_transport_options = {'visibility_timeout': 18000}  # 5 hours

设置生产者连接最大重试次数的示例(因此,如果第一次任务执行时代理不可用,生产者将不会永远重试):

broker_transport_options = {'max_retries': 5}

工人

imports

默认值:[](空列表)。

当工作程序启动时要导入的一系列模块。

这用于指定要导入的任务模块,还用于导入信号处理程序和其他远程控制命令等。

模块将按原始顺序导入。


include

默认值:[](空列表)。

语义与 :setting:`imports` 完全相同,但可以用作具有不同导入类别的手段。

此设置中的模块是在 :setting:`imports` 中的模块之后导入的。


worker_deduplicate_successful_tasks

5.1 版中的新功能。


默认值:假

在每个任务执行之前,指示工作人员检查此任务是否是重复消息。

重复数据删除仅发生在具有相同标识符、启用延迟确认、由消息代理重新交付且它们在结果后端中的状态为 SUCCESS 的任务时。

为了避免查询溢出结果后端,在查询结果后端之前检查成功执行任务的本地缓存,以防任务已由接收任务的同一个工作人员成功执行。

这个缓存可以通过设置 :setting:`worker_state_db` 设置来持久化。

如果结果后端不是持久的(例如 RPC 后端),则忽略此设置。


worker_concurrency

默认值:CPU 内核数。

并发工作进程/线程/绿色线程执行任务的数量。

如果您主要执行 I/O,您可以拥有更多进程,但如果主要受 CPU 限制,请尽量使其接近您机器上的 CPU 数量。 如果未设置,将使用主机上的 CPU/内核数。


worker_prefetch_multiplier

默认值:4。

一次预取的消息数乘以并发进程数。 默认值为 4(每个进程 4 条消息)。 但是,默认设置通常是一个不错的选择——如果您有很长的运行任务在队列中等待并且您必须启动工作程序,请注意第一个启动的工作程序将收到四倍于最初的消息数。 因此,任务可能不会公平地分配给工人。

要禁用预取,请将 :setting:`worker_prefetch_multiplier` 设置为 1。 将该设置更改为 0 将允许工作人员继续消费尽可能多的消息。

有关预取的更多信息,请阅读 Prefetch Limits

笔记

带有 ETA/倒计时的任务不受预取限制的影响。


worker_lost_wait

默认值:10.0 秒。

在某些情况下,worker 可能会在没有适当清理的情况下被杀死,并且该 worker 可能在终止之前已经发布了结果。 此值指定在引发 @WorkerLostError 异常之前我们等待任何丢失结果的时间。


worker_max_tasks_per_child

池工作进程在被新任务替换之前可以执行的最大任务数。 默认是没有限制。


worker_max_memory_per_child

默认值:无限制。 类型:int(千字节)

一个工作程序在被新工作程序替换之前可能消耗的最大驻留内存量(以千字节为单位)。 如果单个任务导致工人超过此限制,则该任务将被完成,然后该工人将被替换。

示例:

worker_max_memory_per_child = 12000  # 12MB

worker_disable_rate_limits

默认值:禁用(启用速率限制)。

禁用所有速率限制,即使任务设置了明确的速率限制。


worker_state_db

默认值:None

用于存储持久工作状态(如已撤销的任务)的文件的名称。 可以是相对或绝对路径,但请注意后缀 .db 可能会附加到文件名(取决于 Python 版本)。

也可以通过 celery worker --statedb 参数进行设置。


worker_timer_precision

默认值:1.0 秒。

设置 ETA 调度程序在重新检查调度之间可以休眠的最长时间(以秒为单位)。

将此值设置为 1 秒意味着调度程序精度将为 1 秒。 如果您需要接近毫秒的精度,您可以将其设置为 0.1。


worker_enable_remote_control

默认:默认启用。

指定是否启用工作人员的远程控制。


worker_proc_alive_timeout

默认值:4.0。

等待新工作进程启动时的超时(以秒为单位)(整数/浮点数)。


worker_cancel_long_running_tasks_on_connection_loss

5.1 版中的新功能。


默认:默认禁用。

终止所有长时间运行的任务,并在连接丢失时启用延迟确认。

在连接丢失之前没有被确认的任务不能再这样做,因为它们的通道消失了并且任务被重新传递回队列。 这就是为什么启用了延迟确认的任务必须是幂等的,因为它们可能会被执行多次。 在这种情况下,每次连接丢失都会执行两次任务(有时在其他工作线程中并行执行)。

打开此选项时,那些尚未完成的任务将被取消并终止其执行。 只要 :setting:`task_ignore_result` 未启用,在连接丢失之前以任何方式完成的任务都会记录在结果后端中。

警告

此功能是作为未来的重大更改引入的。 如果关闭,Celery 会发出警告信息。

在 Celery 6.0 中,:setting:`worker_cancel_long_running_tasks_on_connection_loss` 将默认设置为 True,因为当前行为导致的问题多于解决的问题。


活动

worker_send_task_events

默认:默认禁用。

发送与任务相关的事件,以便可以使用 flower 等工具监控任务。 设置工人 -E 参数的默认值。


task_send_sent_event

2.2 版中的新功能。


默认:默认禁用。

如果启用,将为每个任务发送一个 :event:`task-sent` 事件,以便可以在任务被工作人员使用之前对其进行跟踪。


event_queue_ttl

支持的传输
amqp

默认值:5.0 秒。

发送到监视器客户端事件队列的消息被删除时的消息到期时间(整数/浮点数)(x-message-ttl

例如,如果此值设置为 10,则发送到此队列的消息将在 10 秒后被删除。


event_queue_expires

支持的传输
amqp

默认值:60.0 秒。

以秒为单位的到期时间(整数/浮点数),用于删除监视器客户端事件队列后的时间(x-expires)。


event_queue_prefix

默认值:"celeryev"

用于事件接收器队列名称的前缀。


event_exchange

默认值:"celeryev"

事件交换的名称。

警告

此选项处于实验阶段,请谨慎使用。


event_serializer

默认值:"json"

发送事件消息时使用的消息序列化格式。

也可以看看

序列化程序


远程控制命令

笔记

要禁用远程控制命令,请参阅 :setting:`worker_enable_remote_control` 设置。


control_queue_ttl

默认值:300.0

远程控制命令队列中的消息将到期之前的时间(以秒为单位)。

如果使用默认值 300 秒,这意味着如果发送了远程控制命令并且在 300 秒内没有工作人员接听,则该命令将被丢弃。

此设置也适用于远程控制回复队列。


control_queue_expires

默认值:10.0

从代理中删除未使用的远程控制命令队列之前的时间(以秒为单位)。

此设置也适用于远程控制回复队列。


control_exchange

默认值:"celery"

控制命令交换的名称。

警告

此选项处于实验阶段,请谨慎使用。


日志记录

worker_hijack_root_logger

2.2 版中的新功能。


默认:默认启用(劫持根记录器)。

默认情况下,根记录器上任何先前配置的处理程序都将被删除。 如果您想自定义自己的日志处理程序,则可以通过设置 worker_hijack_root_logger = False 来禁用此行为。

笔记

也可以通过连接到 :signal:`celery.signals.setup_logging` 信号来自定义日志记录。


worker_log_color

默认值:如果应用程序正在登录到终端,则启用。

在 Celery 应用程序的日志输出中启用/禁用颜色。


worker_log_format

默认:

"[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"

用于日志消息的格式。

有关日志格式的更多信息,请参阅 Python logging 模块。


worker_task_log_format

默认:

"[%(asctime)s: %(levelname)s/%(processName)s]
    %(task_name)s[%(task_id)s]: %(message)s"

用于在任务中记录的日志消息的格式。

有关日志格式的更多信息,请参阅 Python logging 模块。


worker_redirect_stdouts

默认:默认启用。

如果启用 stdout 和 stderr 将被重定向到当前记录器。

celery workercelery beat 使用。


worker_redirect_stdouts_level

默认值:WARNING

stdout 和 stderr 的日志级别输出被记录为。 可以是 DEBUGINFOWARNINGERRORCRITICAL 之一。


安全

security_key

默认值:None

2.5 版中的新功能。


当使用 消息签名 时,包含用于签名消息的私钥的文件的相对或绝对路径。


security_certificate

默认值:None

2.5 版中的新功能。


使用 消息签名 时用于签名消息的 X.509 证书文件的相对或绝对路径。


security_cert_store

默认值:None

2.5 版中的新功能。


包含用于 消息签名 的 X.509 证书的目录。 可以是带有通配符的 glob,(例如 /etc/certs/*.pem)。


security_digest

默认值:sha256

4.3 版中的新功能。


使用 消息签名 时用于对消息进行签名的加密摘要。 https://cryptography.io/en/latest/hazmat/primitives/cryptographic-hashes/#module-cryptography.hazmat.primitives.hashes


自定义组件类(高级)

worker_pool

默认值:"prefork" (celery.concurrency.prefork:TaskPool)。

工作程序使用的池类的名称。

Eventlet/Gevent

切勿使用此选项来选择 eventlet 或 gevent 池。 您必须将 -P 选项用于 celery worker,以确保不会太晚应用猴子补丁,导致事情以奇怪的方式中断。


worker_pool_restarts

默认:默认禁用。

如果启用,可以使用 :control:`pool_restart` 远程控制命令重新启动工作池。


worker_autoscaler

2.2 版中的新功能。


默认值:"celery.worker.autoscale:Autoscaler"

要使用的自动缩放程序类的名称。


worker_consumer

默认值:"celery.worker.consumer:Consumer"

工人使用的消费者类的名称。


worker_timer

默认值:"kombu.asynchronous.hub.timer:Timer"

工作程序使用的 ETA 调度程序类的名称。 默认为或由池实现设置。


节拍设置(芹菜节拍)

beat_schedule

默认值:{}(空映射)。

beat 使用的周期性任务计划。 请参阅 条目


beat_scheduler

默认值:"celery.beat:PersistentScheduler"

默认调度程序类。 例如,可以设置为 "django_celery_beat.schedulers:DatabaseScheduler",如果与 :pypi:`django-celery-beat` 扩展一起使用。

也可以通过 celery beat -S 参数进行设置。


beat_schedule_filename

默认值:"celerybeat-schedule"

PersistentScheduler 用来存储周期任务上次运行时间的文件名。 可以是相对或绝对路径,但请注意后缀 .db 可能会附加到文件名(取决于 Python 版本)。

也可以通过 celery beat --schedule 参数进行设置。


beat_sync_every

默认值:0。

在发出另一个数据库同步之前可以调用的周期性任务数。 值 0(默认值)表示基于时间同步 - 由 scheduler.sync_every 确定的默认值为 3 分钟。 如果设置为 1,beat 将在每个任务消息发送后调用同步。


beat_max_loop_interval

默认值:0。

beat 可以在检查时间表之间休眠的最大秒数。

此值的默认值是特定于调度程序的。 对于默认的 Celery beat 调度器,该值为 300(5 分钟),但对于 :pypi:`django-celery-beat` 数据库调度器,它是 5 秒,因为调度可能会从外部更改,所以它必须考虑到时间表的变化。

此外,当在 Jython 上作为线程运行嵌入的 Celery beat (-B) 时,最大间隔会被覆盖并设置为 1,以便可以及时关闭。