“大实例”重构 — Python 文档

来自菜鸟教程
Celery/docs/latest/internals/app-overview
跳转至:导航、​搜索

“大实例”重构

app 分支是一项正在进行的工作,以删除 Celery 中全局配置的使用。

Celery 现在可以被实例化,并且多个 Celery 实例可能存在于同一个进程空间中。 此外,无需借助猴子修补即可定制大型零件。

例子

创建一个 Celery 实例:

>>> from celery import Celery
>>> app = Celery()
>>> app.config_from_object('celeryconfig')
>>> #app.config_from_envvar('CELERY_CONFIG_MODULE')

创建任务:

@app.task
def add(x, y):
    return x + y

创建自定义任务子类:

Task = celery.create_task_cls()

class DebugTask(Task):

    def on_failure(self, *args, **kwargs):
        import pdb
        pdb.set_trace()

@app.task(base=DebugTask)
def add(x, y):
    return x + y

启动一个工人:

worker = celery.Worker(loglevel='INFO')

访问配置:

celery.conf.task_always_eager = True
celery.conf['task_always_eager'] = True

控制工人:

>>> celery.control.inspect().active()
>>> celery.control.rate_limit(add.name, '100/m')
>>> celery.control.broadcast('shutdown')
>>> celery.control.discard_all()

其他有趣的属性:

# Establish broker connection.
>>> celery.broker_connection()

# AMQP Specific features.
>>> celery.amqp
>>> celery.amqp.Router
>>> celery.amqp.get_queues()
>>> celery.amqp.get_task_consumer()

# Loader
>>> celery.loader

# Default backend
>>> celery.backend

您可能会看到,这确实开辟了定制能力的另一个维度。


已弃用

  • celery.task.ping celery.task.PingTask

    低于ping远程控制命令。 将在 Celery 2.3 中移除。


别名(待弃用)

  • celery.execute
    • .send_task -> {app.send_task}

    • .delay_task -> 无可替代

  • celery.log
    • .get_default_logger -> {app.log.get_default_logger}

    • .setup_logger -> {app.log.setup_logger}

    • .get_task_logger -> {app.log.get_task_logger}

    • .setup_task_logger -> {app.log.setup_task_logger}

    • .setup_logging_subsystem -> {app.log.setup_logging_subsystem}

    • .redirect_stdouts_to_logger -> {app.log.redirect_stdouts_to_logger}

  • celery.messaging
    • .establish_connection -> {app.broker_connection}

    • .with_connection -> {app.with_connection}

    • .get_consumer_set -> {app.amqp.get_task_consumer}

    • .TaskPublisher -> {app.amqp.TaskPublisher}

    • .TaskConsumer -> {app.amqp.TaskConsumer}

    • .ConsumerSet -> {app.amqp.ConsumerSet}

  • celery.conf.* -> {app.conf}

    注意:现在所有配置键的名称与配置中的名称相同。 所以键 task_always_eager 被访问为:

    >>> app.conf.task_always_eager

    代替:

    >>> from celery import conf
    >>> conf.always_eager
    • .get_queues -> {app.amqp.get_queues}


  • celery.utils.info
    • .humanize_seconds -> celery.utils.time.humanize_seconds

    • .textindent -> celery.utils.textindent

    • .get_broker_info -> {app.amqp.get_broker_info}

    • .format_broker_info -> {app.amqp.format_broker_info}

    • .format_queues -> {app.amqp.format_queues}


默认应用使用情况

为了向后兼容,必须可以在不传递显式应用程序实例的情况下使用所有类/函数。

这是通过让所有依赖于应用程序的对象在应用程序实例丢失时使用 default_app 来实现的。

from celery.app import app_or_default

class SomeClass:

    def __init__(self, app=None):
        self.app = app_or_default(app)

这种方法的问题在于应用程序实例有可能在此过程中丢失,并且一切似乎都在正常工作。 测试应用程序实例泄漏很困难。 可以使用环境变量 CELERY_TRACE_APP,当它启用时,celery.app.app_or_default() 将在必须返回默认应用程序实例时引发异常。

应用依赖树

  • *; {app}
    *;* celery.loaders.base.BaseLoader
    • celery.backends.base.BaseBackend
    • *;*; {app.TaskSet}
    • *;*;* celery.task.sets.TaskSet (app.TaskSet)
  • *;*; [app.TaskSetResult]
  • *;*;* celery.result.TaskSetResult (app.TaskSetResult)
  • *; {app.AsyncResult}
    *;* celery.result.BaseAsyncResult / celery.result.AsyncResult
  • *; celery.bin.worker.WorkerCommand
    *;* *;*; celery.apps.worker.Worker
    • *;*;* *;*;*; celery.worker.WorkerController
      • *;*;*;* *;*;*;*; celery.worker.consumer.Consumer
        • *;*;*;*;* celery.worker.request.Request
          • celery.events.EventDispatcher
          • *;*;*;*;*; celery.worker.control.ControlDispatch
          • *;*;*;*;*;* celery.worker.control.registry.Panel
            • celery.pidbox.BroadcastPublisher
          • celery.pidbox.BroadcastConsumer
        • celery.beat.EmbeddedService
  • *; celery.bin.events.EvCommand
    *;* *;*; celery.events.snapshot.evcam
    • *;*;* celery.events.snapshot.Polaroid
      • celery.events.EventReceiver
    • *;*; celery.events.cursesmon.evtop
    • *;*;* celery.events.EventReceiver
      • celery.events.cursesmon.CursesMonitor
    • *;*; celery.events.dumper
    • *;*;* celery.events.EventReceiver
  • celery.bin.amqp.AMQPAdmin
  • *; celery.bin.beat.BeatCommand
    *;* *;*; celery.apps.beat.Beat
    • *;*;* *;*;*; celery.beat.Service
      • *;*;*;* celery.beat.Scheduler