如何使用RabbitMQ和Python的Puka向多个消费者传递消息

来自菜鸟教程
跳转至:导航、​搜索

先决条件


兔MQ


只有在安装和配置软件本身之后,才能使用 RabbitMQ 发送和接收消息。 如何安装和管理 RabbitMQ 详细解释了如何让 RabbitMQ 工作,是使用这个消息代理的一个很好的起点。

普卡 Python 库


本文中的所有示例均使用 Python 语言提供,并使用处理 AMQP 消息传递协议的 puka 库作为支持。 为了简单的介绍,Python 被选为一种干净且易于理解的语言,但由于 AMQP 是一种广泛采用的协议,因此可以自由使用任何其他编程语言来实现类似的目标。

puka 可以使用 Python 包管理器 pip 快速安装。

pip install puka

pip 并不总是与 Linux 发行版捆绑在一起。 在基于 Debian 的发行版(包括 Ubuntu)上,可以使用以下命令轻松安装:

apt-get install python-pip

基于 RHEL,例如 CentOS:

yum install python-setuptools
easy_install pip

RabbitMQ 及其术语介绍


消息传递[特别是RabbitMQ] 介绍了一些描述消息代理及其机制的基本原理的术语。

  • Producer发送 消息的一方,因此创建消息正在生产。
  • Consumer接收消息的一方,因此接收消息是消耗。
  • Queue 是一个缓冲区,其中存储已发送的消息并准备好接收。 单个队列可以容纳多少条消息没有限制。 对于有多少生产者可以向队列发送消息,也没有多少消费者可以尝试访问它,也没有限制。 当消息到达现有队列时,它会在那里等待,直到被访问该特定队列的消费者消费。 当消息到达不存在的队列时,它会被丢弃。
  • Exchange 是一个存在于生产者和队列之间的实体。 生产者从不直接向队列发送消息。 它将消息发送到交换器,然后交换器将消息放置到一个或多个队列中,具体取决于所使用的交换器。 用现实生活中的比喻来说,exchange 就像一个邮递员:它处理消息,以便将它们传递到适当的队列(邮箱),消费者可以从中收集它们。
  • Binding 是队列和交换器之间的连接。 绑定到某个交换的队列由交换服务。 具体如何取决于交易所本身。

本书通篇将使用所有五个术语。 还有一个与 puka python 库严格相关的库,因为它的清晰性而被选为首选库。 它是一个promise,可以理解为对AMQP服务器的同步请求,保证请求的执行(成功与否),客户端等待直到完成。

虽然 puka 可以异步工作,但在我们的示例中 puka 将用作同步库。 这意味着在每个请求(承诺)之后,puka 将等到它被执行后再进行下一步。

用一个简单的例子测试 RabbitMQ 和 Puka


要测试消息代理和 puka 是否完美工作,并掌握发送和接收消息在实践中的工作方式,请创建一个名为 rabbit_test.py 的示例 Python 脚本

vim rabbit_test.py

并粘贴脚本内容:

import puka

# declare send and receive clients, both connecting to the same server on local machine
producer = puka.Client("amqp://localhost/")
consumer = puka.Client("amqp://localhost/")

# connect sending party
send_promise = producer.connect()
producer.wait(send_promise)

# connect receiving party
receive_promise = consumer.connect()
consumer.wait(receive_promise)

# declare queue (queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
send_promise = producer.queue_declare(queue='rabbit')
producer.wait(send_promise)

# send message to the queue named rabbit
send_promise = producer.basic_publish(exchange='', routing_key='rabbit', body='Droplet test!')
producer.wait(send_promise)

print "Message sent!"

# start waiting for messages, also those sent before (!), on the queue named rabbit
receive_promise = consumer.basic_consume(queue='rabbit', no_ack=True)

print "Starting receiving!"

while True:
    received_message = consumer.wait(receive_promise)
    print "GOT: %r" % (received_message['body'],)
    break

:wq 保存文件并退出。

运行脚本应该打印脚本发送到 RabbitMQ 队列的消息,因为测试程序随后会立即收到消息。 输出应如下所示:

root@rabbitmq:~# python rabbit_test.py
Message sent!
Starting receiving!
GOT: 'Droplet test!'
root@rabbitmq:~#

为了解释这段代码中发生了什么,让我们一步一步来:

  1. 消费者和生产者都被创建并连接到同一个 RabbitMQ 服务器,驻留在 localhost
  2. Producer 声明一个队列,以确保在生成消息时它存在。 如果没有这一步,队列可能不存在,因此消息可能会立即被丢弃。
  3. 生产者将消息发送到 nameless_exchange(稍后会详细介绍交换),并带有指定预先创建的队列的路由键。 之后,消息到达交换器,交换器又将其放入“兔子”队列。 然后该消息就在那里,直到有人消费它。
  4. 消费者访问“兔子”队列并开始接收存储在那里的消息。 因为有一条消息在等待,它会立即送达。 它已被消耗,这意味着它将不再留在队列中。
  5. 消费的消息被打印在屏幕上。

扇出交换


在前面的示例中,使用了一个无名交换器将消息传递到名为“rabbit”的特定队列。 无名交换需要一个队列名称才能工作,这意味着它只能将消息传递到单个队列。

RabbitMQ 中还有其他类型的交换,其中之一是我们在本文中主要关注的 fanout。 扇出交换是一个简单的、盲目的工具,可以将消息传递到它知道的 ALL 队列。 有了扇出交换,就不需要(事实上——不可能)提供特定的队列名称。 到达这种交换的消息在生成消息之前被传递到绑定到交换的所有队列。 可以连接到交换的队列数量没有限制。

发布/订阅模式


通过扇出交换,我们可以轻松创建 发布/订阅 模式,就像对所有新闻通讯一样开放。 Producer 是一个时事通讯广播公司,它会定期向它甚至可能不知道的观众发送消息(产生消息并将其发送到时事通讯扇出交换)。 新订阅者申请时事通讯(将自己的队列绑定到同一个时事通讯扇出)。 从那一刻起,时事通讯扇出交换将把消息传递给所有注册的订阅者(队列)。

虽然一对一的消息传递非常简单,并且开发人员经常使用其他通信方式,但一对多(其中“many”未指定,可以是 few 和 lots[X202X ]) 是一个非常流行的场景,其中消息代理可以提供巨大帮助。

编写生产者应用程序


生产者应用程序的唯一作用是创建一个命名的扇出交换并向该交换生成定期消息(每隔几秒)。 在现实生活中,消息的产生是有原因的。 为了简化示例,将自动生成消息。 此应用程序将充当时事通讯发布者。

创建一个名为 newsletter_produce.py 的 python 脚本

vim newsletter_produce.py

并粘贴脚本内容:

import puka
import datetime
import time

# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)

# create a fanout exchange
exchange_promise = producer.exchange_declare(exchange='newsletter', type='fanout')
producer.wait(exchange_promise)

# send current time in a loop
while True:
    message = "%s" % datetime.datetime.now()

    message_promise = producer.basic_publish(exchange='newsletter', routing_key='', body=message)
    producer.wait(message_promise)

    print "SENT: %s" % message
    
    time.sleep(1)
    
producer.close()

让我们通过示例一步一步来解释代码中发生了什么。

  1. 创建生产者客户端并连接到本地 RabbitMQ 实例。 从现在开始,它可以与 RabbitMQ 自由通信了。
  2. 创建了一个名为 newsletter 的扇出交换。 在该步骤之后,交换存在于 RabbitMQ 服务器上,可用于将队列绑定到它并通过它发送消息。
  3. 在一个无限循环中,具有当前时间的消息被生成到 newsletter 交换。 请注意,routing_key 为空,这意味着没有指定特定的队列。 交换会将消息传递到适当的队列。

该应用程序在运行时会将当前时间通知给所有时事通讯订阅者。

编写消费者应用程序


消费者应用程序将创建一个临时队列并将其绑定到一个命名的扇出交换。 之后,它将开始等待消息。 将队列绑定到交换器后,之前创建的生产者发送的每条消息都将被此消费者接收。 该应用程序将充当时事通讯订阅者——可以一次运行该应用程序多次,但所有实例仍将接收广播消息。

创建一个名为 newsletter_consume.py 的 python 脚本

vim newsletter_consume.py

并粘贴脚本内容:

import puka

# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)

# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']

# bind the queue to newsletter exchange
bind_promise = consumer.queue_bind(exchange='newsletter', queue=queue)
consumer.wait(bind_promise)

# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)

while True:
    message = consumer.wait(message_promise)
    print "GOT: %r" % message['body']
    
consumer.close()

消费者的代码比生产者的要复杂一些。 让我们一步一步来看看:

  1. 消费者客户端被创建并连接到本地 RabbitMQ 实例。
  2. 创建一个临时队列。 临时意味着没有提供名称 - 队列名称将由 RabbitMQ 自动生成 。 此外,在客户端断开连接后,此类队列将被销毁。 这是一种常见的创建队列的方法,队列的存在仅绑定到其中一个交易所,没有其他特殊用途。 由于需要创建一个队列来接收任何东西,这是一种避免考虑队列名称的便捷方法。
  3. 创建的队列绑定到 newsletter 交换。 从那一刻起,扇出交换会将每条消息传递到该队列。
  4. 在一个无限循环中,消费者在队列中等待,接收到达队列的每条消息并将其打印在屏幕上。

应用程序在运行时接收来自新闻通讯发布者的时间通知。 它可以一次执行多次,并且该应用程序的每个实例都将获取当前时间。

测试两个应用程序


要测试时事通讯发布者及其消费者,请打开到虚拟服务器的多个 SSH 会话(或打开多个终端窗口,如果在本地计算机上工作)。 在其中一个窗口中运行生产者应用程序。

root@rabbitmq:~# python newsletter_produce.py

它将开始每秒显示当前时间:

SENT: 2014-02-11 17:24:47.309000
SENT: 2014-02-11 17:24:48.310000
SENT: 2014-02-11 17:24:49.312000
SENT: 2014-02-11 17:24:50.316000
...

在每个其他窗口中运行消费者应用程序:

root@rabbitmq:~# python newsletter_consume.py

此应用程序的每个实例都将收到生产者广播的时间通知:

GOT: 2014-02-11 17:24:47.309000
GOT: 2014-02-11 17:24:48.310000
GOT: 2014-02-11 17:24:49.312000
GOT: 2014-02-11 17:24:50.316000
...

这意味着 RabbitMQ 正确注册了 fanout 交换,将订阅者队列绑定到此交换,并将发送的消息传递到正确的队列。 换句话说,RabbitMQ 按预期工作。

延伸阅读


发布/订阅是一种简单的(无论是在概念上还是在实现上)消息传递模式,通常可以派上用场; 不过,它远不及 RabbitMQ 的限制。 有无数种方法可以使用 RabbitMQ 解决消息传递问题,包括高级消息路由、消息确认、安全性或持久性。

本文的主要目的是通过简单的示例介绍基本的消息传递概念。 官方 RabbitMQ 文档 详细介绍了许多其他用途,这是 RabbitMQ 用户和管理员的绝佳资源。

文章提交:http: [[“%3Ca|//maticomp.net]] [[“%3C/a|”>马特乌斯·帕皮尔尼克]]