“大实例”重构 — Python 文档
“大实例”重构
app 分支是一项正在进行的工作,以删除 Celery 中全局配置的使用。
Celery 现在可以被实例化,并且多个 Celery 实例可能存在于同一个进程空间中。 此外,无需借助猴子修补即可定制大型零件。
例子
创建一个 Celery 实例:
>>> from celery import Celery
>>> app = Celery()
>>> app.config_from_object('celeryconfig')
>>> #app.config_from_envvar('CELERY_CONFIG_MODULE')
创建任务:
@app.task
def add(x, y):
return x + y
创建自定义任务子类:
Task = celery.create_task_cls()
class DebugTask(Task):
def on_failure(self, *args, **kwargs):
import pdb
pdb.set_trace()
@app.task(base=DebugTask)
def add(x, y):
return x + y
启动一个工人:
worker = celery.Worker(loglevel='INFO')
访问配置:
celery.conf.task_always_eager = True
celery.conf['task_always_eager'] = True
控制工人:
>>> celery.control.inspect().active()
>>> celery.control.rate_limit(add.name, '100/m')
>>> celery.control.broadcast('shutdown')
>>> celery.control.discard_all()
其他有趣的属性:
# Establish broker connection.
>>> celery.broker_connection()
# AMQP Specific features.
>>> celery.amqp
>>> celery.amqp.Router
>>> celery.amqp.get_queues()
>>> celery.amqp.get_task_consumer()
# Loader
>>> celery.loader
# Default backend
>>> celery.backend
您可能会看到,这确实开辟了定制能力的另一个维度。
已弃用
celery.task.ping
celery.task.PingTask
低于ping远程控制命令。 将在 Celery 2.3 中移除。
别名(待弃用)
celery.execute
.send_task
-> {app.send_task
}.delay_task
-> 无可替代
celery.log
.get_default_logger
-> {app.log.get_default_logger
}.setup_logger
-> {app.log.setup_logger
}.get_task_logger
-> {app.log.get_task_logger
}.setup_task_logger
-> {app.log.setup_task_logger
}.setup_logging_subsystem
-> {app.log.setup_logging_subsystem
}.redirect_stdouts_to_logger
-> {app.log.redirect_stdouts_to_logger
}
celery.messaging
.establish_connection
-> {app.broker_connection
}.with_connection
-> {app.with_connection
}.get_consumer_set
-> {app.amqp.get_task_consumer
}.TaskPublisher
-> {app.amqp.TaskPublisher
}.TaskConsumer
-> {app.amqp.TaskConsumer
}.ConsumerSet
-> {app.amqp.ConsumerSet
}
celery.conf.*
-> {app.conf
}注意:现在所有配置键的名称与配置中的名称相同。 所以键
task_always_eager
被访问为:>>> app.conf.task_always_eager
代替:
>>> from celery import conf >>> conf.always_eager
.get_queues
-> {app.amqp.get_queues
}
celery.utils.info
.humanize_seconds
->celery.utils.time.humanize_seconds
.textindent
->celery.utils.textindent
.get_broker_info
-> {app.amqp.get_broker_info
}.format_broker_info
-> {app.amqp.format_broker_info
}.format_queues
-> {app.amqp.format_queues
}
默认应用使用情况
为了向后兼容,必须可以在不传递显式应用程序实例的情况下使用所有类/函数。
这是通过让所有依赖于应用程序的对象在应用程序实例丢失时使用 default_app
来实现的。
from celery.app import app_or_default
class SomeClass:
def __init__(self, app=None):
self.app = app_or_default(app)
这种方法的问题在于应用程序实例有可能在此过程中丢失,并且一切似乎都在正常工作。 测试应用程序实例泄漏很困难。 可以使用环境变量 CELERY_TRACE_APP
,当它启用时,celery.app.app_or_default()
将在必须返回默认应用程序实例时引发异常。
应用依赖树
- *; {
app
}- *;*
celery.loaders.base.BaseLoader
celery.backends.base.BaseBackend
- *;*; {
app.TaskSet
}
- *;*;*
celery.task.sets.TaskSet
(app.TaskSet
)
- *;*;*
- *;*
- *;*; [
app.TaskSetResult
]
- *;*; [
- *;*;*
celery.result.TaskSetResult
(app.TaskSetResult
)
- *;*;*
app.AsyncResult
}
- *;*
celery.result.BaseAsyncResult
/celery.result.AsyncResult
celery.bin.worker.WorkerCommand
- *;* *;*;
celery.apps.worker.Worker
- *;*;* *;*;*;
celery.worker.WorkerController
- *;*;* *;*;*;
- *;*;*;* *;*;*;*;
celery.worker.consumer.Consumer
- *;*;*;* *;*;*;*;
- *;*;*;*;*
celery.worker.request.Request
- *;*;*;*;*
celery.events.EventDispatcher
- *;*;*;*;*;
celery.worker.control.ControlDispatch
- *;*;*;*;*;*
celery.worker.control.registry.Panel
- *;*;*;*;*;*
celery.pidbox.BroadcastPublisher
celery.pidbox.BroadcastConsumer
celery.beat.EmbeddedService
celery.bin.events.EvCommand
- *;* *;*;
celery.events.snapshot.evcam
- *;*;*
celery.events.snapshot.Polaroid
- *;*;*
celery.events.EventReceiver
- *;*;
celery.events.cursesmon.evtop
- *;*;
- *;*;*
celery.events.EventReceiver
- *;*;*
celery.events.cursesmon.CursesMonitor
- *;*;
celery.events.dumper
- *;*;
- *;*;*
celery.events.EventReceiver
- *;*;*
celery.bin.amqp.AMQPAdmin
celery.bin.beat.BeatCommand
- *;* *;*;
celery.apps.beat.Beat
- *;*;* *;*;*;
celery.beat.Service
- *;*;* *;*;*;
- *;*;*;*
celery.beat.Scheduler
- *;*;*;*