如何使用RabbitMQ和PukaPython库基于路由密钥传递消息
入门
先决条件
本文是 如何使用 RabbitMQ 和 Python 的 Puka 将消息传递给多个消费者 的延续,并且需要相同的软件捆绑并正常运行。 此外,整篇文章都使用相同的定义,我们假设读者熟悉前文中的主题。
交流
我们已经描述了 fanout
交换,它将消息传递到绑定到该交换的每个队列,而无需额外的规则。 这是一个非常有用的机制,但缺乏灵活性。 通常不希望收到生产者向交易所发出的所有内容。 RabbitMQ 提供了两种不同的交换类型,可用于实现更复杂的场景。 其中之一是 direct
交换。
直接交换
介绍
直接交换在 RabbitMQ 中提供了一种简单的、基于密钥的路由机制。 它有点类似于第一个示例中使用的无名交换,其中消息被传递到名称等于消息的路由键的队列。 然而,虽然使用无名交换不需要定义显式的队列绑定,但在直接交换中,绑定是至关重要的和强制性的。
使用直接交换时,生成到该交换的每条消息都必须指定一个路由键,这是一个任意名称字符串,例如 德克萨斯。 然后,该消息将被传递到已使用相同路由键绑定到此交换的所有队列(所有明确声明为对具有 Texas 路由键的消息感兴趣的队列)。
基本无名交换和 direct
交换之间的最大区别在于后者需要绑定,并且在此之前没有队列侦听该交换上的消息。 这反过来又带来了三个巨大的优势。
- 一个队列可以绑定在同一个交换机上监听many个不同的路由key
- 一个队列可以绑定一次监听许多不同的交换
- 许多队列可以绑定在一个交换器上监听same路由键
让我们想象一个大城市枢纽:一个火车站和汽车站,两种交通工具都可以到达许多目的地。 假设车站想要使用 RabbitMQ 发送出发通知。 任务是通知所有感兴趣的人,即将开往 Seattle、Tooele 或 Boston 的公共汽车或火车。
这样的程序将定义一个直接的 departures
交换,所有感兴趣的客户都可以订阅他们的队列。 然后,包含出发时间的消息将被生成到该交换,并使用包含目的地的路由键。 例如:
- 与路由键
Tooele
和正文2014-01-03 15:23
交换消息到departures
- 与路由键
Boston
和正文2014-01-03 15:41
交换消息到departures
- 与路由键
Seattle
和正文2014-01-03 15:55
交换消息到departures
由于一个队列可能一次绑定到多个路由键,并且许多队列可以绑定到同一个键,我们可以很容易地拥有:
- 一位仅对 Tooele 感兴趣的客户
- 一位仅对 Boston 感兴趣的客户
- 另一位同时对 Tooele 和 Boston 感兴趣的客户
所有人同时等待信息。 他们将使用我们的 direct 交换接收适当的消息。
制片人
为了稍微简化示例的任务,让我们编写一个基本的通知调度程序,它将接受一个命令行参数。 它将指定目的地,应用程序会将当前时间发送给所有感兴趣的消费者。
创建一个名为 direct_notify.py
的示例 python 脚本
vim direct_notify.py
并粘贴脚本内容:
import puka import datetime import time import sys # declare and connect a producer producer = puka.Client("amqp://localhost/") connect_promise = producer.connect() producer.wait(connect_promise) # create a direct exchange named departures exchange_promise = producer.exchange_declare(exchange='departures', type='direct') producer.wait(exchange_promise) # send current time to destination specified with command line argument message = "%s" % datetime.datetime.now() message_promise = producer.basic_publish(exchange='departures', routing_key=sys.argv[1], body=message) producer.wait(message_promise) print "Departure to %s at %s" % (sys.argv[1], message) producer.close()
按 :wq 保存文件并退出。
使用一个参数运行脚本应打印当前时间和使用的目的地。 输出应如下所示:
root@rabbitmq:~# python direct_notify.py Tooele Departure to Tooele at 2014-02-18 15:57:29.035000 root@rabbitmq:~#
让我们逐步浏览脚本:
- Producer 客户端被创建并连接到本地 RabbitMQ 实例。 从现在开始,它可以与 RabbitMQ 自由通信了。
- 创建了一个名为
departures
的直接交换。 它不需要在创建时指定路由键,因为发布到该交换的任何消息都可以分配不同的键。 在该步骤之后,交换存在于 RabbitMQ 服务器上,可用于将队列绑定到它并通过它发送消息。 - 使用命令行参数作为路由键将包含当前时间的消息发布到该交换。 在示例运行中,Tooele 用作参数,因此用作出发目的地 - 路由键。
注意: 为简单起见,脚本不会检查是否提供了强制命令行参数! 如果没有参数执行,它将无法正常工作。
消费者
此示例消费者应用程序将充当对从车站可到达的一个或多个目的地感兴趣的公共交通客户。
创建一个名为 direct_watch.py
的示例 python 脚本
vim direct_watch.py
并粘贴脚本内容:
import puka import sys # declare and connect a consumer consumer = puka.Client("amqp://localhost/") connect_promise = consumer.connect() consumer.wait(connect_promise) # create temporary queue queue_promise = consumer.queue_declare(exclusive=True) queue = consumer.wait(queue_promise)['queue'] # bind the queue to all routing keys specified by command line arguments for destination in sys.argv[1:]: print "Watching departure times for %s" % destination bind_promise = consumer.queue_bind(exchange='departures', queue=queue, routing_key=destination) consumer.wait(bind_promise) # start waiting for messages on the queue created beforehand and print them out message_promise = consumer.basic_consume(queue=queue, no_ack=True) while True: message = consumer.wait(message_promise) print "Departure for %s at %s" % (message['routing_key'], message['body']) consumer.close()
按 :wq 保存文件并退出。
使用一个参数 Tooele 运行脚本应该会宣布该脚本监视 Tooele 的出发时间,而使用多个参数运行它应该宣布监视许多目的地的出发时间。
root@rabbitmq:~# python direct_watch.py Tooele Watching departure times for Tooele (...) root@rabbitmq:~# python direct_watch.py Tooele Boston Watching departure times for Tooele Watching departure times for Boston (...) root@rabbitmq:~#
让我们逐步通过脚本来解释它的作用:
- Consumer 客户端被创建并连接到本地 RabbitMQ 实例。 从现在开始,它可以与 RabbitMQ 自由通信了。
- 为这个特定的消费者创建一个临时队列,由 RabbitMQ 自动生成名称。 脚本结束后队列将被销毁。
- 队列绑定到使用命令行参数指定的所有路由键(目的地)上的所有
departures
交换,在屏幕上打印每个目的地以获取信息。 - 脚本开始等待队列中的消息。 它将接收与绑定的路由键匹配的所有消息。 当使用 Tooele 作为单个参数运行时 - 只有那些,当同时使用 Tooele 和 Boston 运行时 - 在这两个参数上。 每个出发时间都会打印在屏幕上。
测试
要检查两个脚本是否按预期工作,请打开服务器的三个终端窗口。 一个将用作公共交通站发送通知。 另外两个将作为等待出发的客户。
在第一个终端中,使用任何参数运行一次 direct_notify.py
脚本:
root@rabbitmq:~# python direct_notify.py Tooele Departure to Tooele at 2014-02-18 15:57:29.035000 root@rabbitmq:~#
重要: direct_notify.py
脚本必须在任何消费者之前至少执行一次,因为必须在将队列绑定到它之前创建交换。 执行后,交换保留在 RabbitMQ 服务器上,可以自由使用。
在第二个终端中,使用一个参数运行 direct_watch.py
脚本 - Tooele:
root@rabbitmq:~# python direct_watch.py Tooele Watching departure times for Tooele (...) root@rabbitmq:~#
在第三个终端中,使用两个参数运行 direct_watch.py
脚本 - Tooele 和 Boston:
root@rabbitmq:~# python direct_watch.py Tooele Boston Watching departure times for Tooele Watching departure times for Boston (...) root@rabbitmq:~#
然后,回到第一个航站楼,发送三个出发通知。 一个到 Tooele,一个到 Boston 和一个到 Chicago:
root@rabbitmq:~# python direct_notify.py Tooele Departure to Tooele at 2014-02-18 15:57:29.035000 root@rabbitmq:~# python direct_notify.py Boston Departure to Tooele at 2014-02-18 15:57:31.035000 root@rabbitmq:~# python direct_notify.py Chicago Departure to Tooele at 2014-02-18 15:57:35.035000 root@rabbitmq:~#
只有等待出发前往图勒的两个消费者才能收到第一个通知。 第二个应该只发给等待离开波士顿的消费者。 这些消费者中的任何一个都不应该收到第三个,因为他们都没有等待出发前往芝加哥。
这是预期的行为。 这些简单的示例说明了如何发送只有由路由键指定的某些消费者才能接收的消息。
进一步阅读
直接路由不能完全控制消息的传递位置,但与以前的交换中使用的 fanout
交换相比,它是一个很大的进步,它在任何地方盲目地传递消息。 使用 direct
交换可以处理许多现实世界的消息传递场景,并且该过程并不是非常困难。
本文的主要目标是使用简单的真实世界情况介绍基本的直接路由。 官方 RabbitMQ 文档 详细介绍了许多其他用途,这是 RabbitMQ 用户和管理员的绝佳资源。