如何使用Celery和RabbitMQ在UbuntuVPS上排队任务
介绍
异步或非阻塞处理是一种将某些任务的执行与程序的主要流程分开的方法。 这为您提供了几个优势,包括允许您面向用户的代码不间断地运行。
消息传递是程序组件可以用来通信和交换信息的一种方法。 它可以同步或异步实现,并且可以允许离散进程毫无问题地进行通信。 对于此类用途,消息传递通常作为传统数据库的替代方案来实现,因为消息队列通常实现附加功能,提供更高的性能,并且可以完全驻留在内存中。
Celery 是一个建立在异步消息传递系统上的任务队列。 它可以用作可以转储编程任务的存储桶。 传递任务的程序可以继续执行和响应式地运行,然后它可以轮询 celery 以查看计算是否完成并检索数据。
虽然 celery 是用 Python 编写的,但它的协议可以用任何语言实现。 它甚至可以通过 webhook 与其他语言一起使用。
通过在程序环境中实现作业队列,您可以轻松卸载任务并继续处理来自用户的交互。 这是一种提高应用程序响应能力的简单方法,并且在执行长时间运行的计算时不会被锁定。
在本指南中,我们将在 Ubuntu 12.04 VPS 上使用 RabbitMQ 作为消息系统安装和实现 celery 作业队列。
安装组件
安装芹菜
Celery 是用 Python 编写的,因此很容易安装,就像我们处理常规 Python 包一样。
我们将按照推荐的程序通过创建一个虚拟环境来安装我们的消息传递系统来处理 Python 包。 这有助于我们保持环境稳定而不影响更大的系统。
从 Ubuntu 的默认存储库安装 Python 虚拟环境包:
sudo apt-get update sudo apt-get install python-virtualenv
我们将创建一个消息传递目录,我们将在其中实现我们的系统:
mkdir ~/messaging cd ~/messaging
我们现在可以创建一个虚拟环境,我们可以使用以下命令在其中安装 celery:
virtualenv --no-site-packages venv
配置好虚拟环境后,我们可以通过键入以下命令来激活它:
source venv/bin/activate
您的提示将更改以反映您现在正在我们上面制作的虚拟环境中操作。 这将确保我们的 Python 包安装在本地而不是全局安装。
如果在任何时候我们需要停用环境(不是现在),您可以键入:
deactivate
现在我们已经激活了环境,我们可以使用 pip 安装 celery:
pip install celery
安装 RabbitMQ
Celery 需要一个消息代理来处理来自外部源的请求。 该代理被称为“经纪人”。
有很多可供代理选择的选项,包括关系数据库、NoSQL 数据库、键值存储和实际的消息传递系统。
我们将配置 celery 以使用 RabbitMQ 消息系统,因为它提供强大、稳定的性能并与 celery 很好地交互。 这是一个很好的解决方案,因为它包含与我们的预期用途非常吻合的功能。
我们可以通过 Ubuntu 的存储库安装 RabbitMQ:
sudo apt-get install rabbitmq-server
RabbitMQ 服务在安装后在我们的服务器上自动启动。
创建一个 Celery 实例
为了使用 celery 的任务队列能力,我们安装后的第一步必须是创建一个 celery 实例。 这是一个简单的过程,导入包,创建一个“应用程序”,然后设置 celery 能够在后台执行的任务。
让我们在名为 tasks.py
的消息传递目录中创建一个 Python 脚本,我们可以在其中定义工作人员可以执行的任务。
sudo nano ~/messaging/tasks.py
我们应该做的第一件事是从 celery 包中导入 Celery 函数:
from celery import Celery
之后,我们可以创建一个连接到默认 RabbitMQ 服务的 celery 应用程序实例:
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://')
Celery
函数的第一个参数是将添加到任务前面以识别它们的名称。
backend
参数是一个可选参数,如果您希望查询后台任务的状态或检索其结果,则该参数是必需的。
如果您的任务只是完成一些工作然后退出的函数,而没有返回在您的程序中使用的有用值,您可以忽略此参数。 如果您的某些任务需要此功能,请在此处启用它,我们可以根据具体情况进一步禁用它。
broker
参数指定连接到我们的代理所需的 URL。 在我们的例子中,这是在我们的服务器上运行的 RabbitMQ 服务。 RabbitMQ 使用称为“amqp”的协议运行。 如果 RabbitMQ 在其默认配置下运行,则 celery 可以使用除了 amqp://
方案之外的其他信息进行连接。
构建 Celery 任务
仍然在这个文件中,我们现在需要添加我们的任务。
每个 celery 任务都必须使用装饰器 @app.task
引入。 这允许 celery 识别可以添加其排队功能的功能。 在每个装饰器之后,我们只需创建一个工人可以运行的函数。
我们的第一个任务将是一个简单的函数,它将一个字符串打印到控制台。
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://') @app.task def print_hello(): print 'hello there'
因为这个函数不返回任何有用的信息(而是将它打印到控制台),我们可以告诉 celery 不要使用后端来存储关于这个任务的状态信息。 这在引擎盖下不太复杂,并且需要更少的资源。
从芹菜进口芹菜 app = Celery('tasks', backend='amqp', broker='amqp://') @应用程序 .task (ignore_result=True) def print_hello(): print 'hello there'
接下来,我们将添加另一个生成素数的函数(取自 RosettaCode)。 这可能是一个长时间运行的进程,因此它是我们在等待结果时如何处理异步工作进程的一个很好的例子。
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://') @app.task(ignore_result=True) def print_hello(): print 'hello there' @app.task def gen_prime(x): multiples = [] results = [] for i in xrange(2, x+1): if i not in multiples: results.append(i) for j in xrange(i*i, x+1, i): multiples.append(j) return results
因为我们关心这个函数的返回值是什么,并且因为我们想知道它何时完成(以便我们可以使用结果等),所以我们没有将 ignore_result
参数添加到这一秒任务。
保存并关闭文件。
启动 Celery 工作进程
我们现在可以启动一个能够接受来自应用程序的连接的工作进程。 它将使用我们刚刚创建的文件来了解它可以执行的任务。
启动一个工作实例就像使用 celery 命令调用应用程序名称一样简单。 我们将在字符串末尾包含一个“&”字符,以将我们的工作进程置于后台:
celery worker -A tasks &
这将启动一个应用程序,然后将其与终端分离,允许您继续将其用于其他任务。
如果你想启动多个worker,你可以通过使用 -n
参数命名每个worker来实现:
celery worker -A tasks -n one.%h & celery worker -A tasks -n two.%h &
当工作人员被命名时,%h
将被主机名替换。
要停止工作人员,您可以使用 kill 命令。 我们可以查询进程id,然后根据这些信息淘汰worker。
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill
这将允许工作人员在退出之前完成其当前任务。
如果您希望在不等待他们完成任务的情况下关闭所有工作人员,您可以执行:
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
使用队列处理工作
我们可以使用我们生成的工作进程在后台为我们的程序完成工作。
我们将探索 Python 解释器中的不同选项,而不是创建整个程序来演示其工作原理:
python
在提示符下,我们可以将函数导入环境:
from tasks import print_hello from tasks import gen_prime
如果您测试这些功能,它们似乎没有任何特殊功能。 第一个函数按预期打印一行:
print_hello()
hello there
第二个函数返回素数列表:
primes = gen_prime(1000) print primes
如果我们给第二个函数一个更大范围的数字来检查,执行会在计算时挂起:
primes = gen_prime(50000)
通过键入“CTRL-C”停止执行。 这个过程显然不是在后台计算。
要访问后台工作人员,我们需要使用 .delay
方法。 Celery 用附加功能包装了我们的函数。 此方法用于将函数传递给工作人员执行。 它应该立即返回:
primes = gen_prime.delay(50000)
这个任务现在由我们之前开始的工作人员执行。 因为我们为我们的应用程序配置了一个 backend
参数,所以我们可以检查计算的状态并访问结果。
要检查任务是否完成,我们可以使用.ready
方法:
primes.ready()
False
“False”值表示任务仍在运行,但结果尚不可用。 当我们得到“真”的值时,我们可以对答案做一些事情。
primes.ready()
True
我们可以使用 .get
方法获取值。
如果我们已经验证了该值是使用 .ready
方法计算的,那么我们可以像这样使用该方法:
print primes.get()
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, . . .
但是,如果您在调用 .get
之前没有使用 .ready
方法,那么您很可能希望添加一个“超时”选项,这样您的程序就不会被迫等待结果,这将违背我们实现的目的:
print primes.get(timeout=2)
如果超时,这将引发异常,您可以在程序中处理该异常。
结论
尽管这些信息足以让您开始在程序中使用 celery,但它只是对这个库的全部功能的初步了解。 Celery 允许您以有趣的方式将后台任务串在一起、分组任务和组合功能。
虽然 celery 是用 Python 编写的,但它可以通过 webhook 与其他语言一起使用。 无论您选择哪种语言,这使得将任务移动到后台变得非常灵活。