如何使用ZeroMQ消息库
介绍
您可以选择多种方式来描述 ZeroMQ; 尽管如此,它仍然是它的本来面目:一个真正卓越的通信库,以其丰富而成熟的功能集极大地使开发人员受益。
在 DigitalOcean ZeroMQ 文章的第二部分中,继我们之前关于应用程序安装的文章之后,我们将深入研究它的用法并发现实际实现这个快速而强大的库的方法。 我们将通过划分为连续部分的各种示例,从进程之间的简单消息传递开始(即 使用请求/响应模式)。
注: 这篇文章是我们关于这个主题的第二篇文章。 如果您有兴趣了解更多信息(即 它是什么以及它如何与完整的消息代理进行比较),请在阅读本教程之前查看 ZeroMQ 介绍和安装方法 。
关于
零MQ
ZeroMQ 是一个用于在应用程序和进程之间实现消息传递和通信系统的库 - 快速和异步。
如果您过去曾使用过其他应用程序消息传递解决方案(例如 RabbitMQ),那么了解 ZeroMQ 的确切位置可能会有点挑战。
与提供企业消息传递的所有必要部分的一些更大的项目相比,ZeroMQ 仍然只是一个轻量级和快速的工具来制作你自己的。
本文
虽然技术上不是一个框架,但考虑到它的功能和它所解决的任务的关键位置,您可以将 ZeroMQ 视为实现应用程序实际通信层的主干。
在本文中,我们旨在为您提供一些示例,以激发您您可以做的所有事情。
注意: 在我们的示例中,我们将使用 Python 语言及其经典解释器(Python C 解释器)。 安装必要的语言绑定后,您应该能够简单地翻译代码并使用您喜欢的代码,而不会有任何问题。 如果您想了解如何在 CentOS VPS 上安装 Python,请查看我们的 如何在 CentOS 6.4 上设置 Python 2.7 教程。
使用 ZeroMQ 编程
ZeroMQ 作为一个库,通过遵循某些网络通信模式通过套接字工作。 它被设计为异步工作,这就是其名称的 MQ 后缀的来源 - 来自发送消息之前的线程排队消息。
ZeroMQ 套接字类型
ZeroMQ 的不同之处在于其套接字的工作方式。 与常规套接字工作的同步方式不同,ZeroMQ 的套接字实现“呈现了异步消息队列的抽象”。
这些套接字的工作方式取决于所选的套接字类型。 发送的消息流取决于选择的模式,其中有四种:
- 请求/回复模式: 用于发送请求并接收每个发送的后续回复。
- 发布/订阅模式: 用于从单个进程分发数据(例如 发布者)到多个接收者(例如 订户)。
- Pipeline Pattern: 用于将数据分发到连接的节点。
- Exclusive Pair Pattern: 用于将两个对等体连接在一起,形成一对。
ZeroMQ 传输类型
ZeroMQ 提供四种不同类型的通信传输。 这些都是:
- 进程内(INPROC):本地(进程内)通信传输。
- 进程间(IPC):本地(进程间)通信传输。
- TCP: 使用 TCP 的单播通信传输。
- PGM: 使用 PGM 的多播通信传输。
构建 ZeroMQ 应用程序
ZeroMQ 的工作方式与典型和传统的通信设置不同。 它可以有链接的任一侧(即 服务器或客户端)绑定并等待连接。 与标准套接字不同,ZeroMQ 的工作原理是知道连接可能会发生,因此可以很好地等待它。
客户端 - 服务器结构
为了构建您的客户端和服务器代码,最好决定并选择一个更稳定的作为 binding 端,另一个作为 connecting。
例子:
Server Application Client Application ---------------------[ < .. < .. < .. < .. ...................... Bound -> Port:8080 Connects <- Port:8080
客户端 - 代理 - 服务器结构
为了解决通信两端处于动态(因此不稳定)状态而导致的问题,ZeroMQ 提供了网络设备(即 开箱即用的餐具)。 这些设备连接到两个不同的端口并路由连接。
- Streamer: 流水线并行通信的流媒体设备。
- Forwarder: 用于发布/订阅通信的转发设备。
- 队列:请求/回复通信的转发设备。
例子:
Server App. Device | Forward Client App. ............ > .. > . ]------------------[ < .. < .. ......... Connects 2 Port Binding Connects
编程示例
利用我们在上一节中的知识,我们现在将开始利用它们来创建简单的应用程序。
注: 以下示例通常由同时运行的应用程序组成。 例如,要使客户端/服务器设置正常工作,您需要同时运行客户端和服务器应用程序。 一种方法是使用 Linux Screen 工具。 要了解更多信息,请查看此 DigitalOceanTutorial。 要在 CentOS 系统上安装 screen,请记住您可以简单地运行:yum install -y screen
。
使用请求/回复模式的简单消息传递
在应用程序之间的通信方面,请求/回复模式可能构成了绝对经典,并为我们提供了一个很好的机会从 ZeroMQ 的基础开始。
用例:
- 用于服务器和客户端之间的简单通信。
- 检查信息并请求更新。
- 向服务器发送 检查 和更新。
- Echo 或 ping/pong 实现。
使用的套接字类型:
- zmq.REP
- zmq.请求
服务器示例:server.py
使用 nano (nano server.py
) 创建一个“server.py”并粘贴以下不言自明的内容。
import zmq # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" sock = context.socket(zmq.REP) sock.bind("tcp://127.0.0.1:5678") # Run a simple "Echo" server while True: message = sock.recv() sock.send("Echo: " + message) print "Echo: " + message
完成编辑后,按 CTRL+X 然后按 Y 保存并退出。
客户端示例:client.py
使用 nano (nano client.py
) 创建一个“client.py”并粘贴以下内容。
import zmq import sys # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" sock = context.socket(zmq.REQ) sock.connect("tcp://127.0.0.1:5678") # Send a "message" using the socket sock.send(" ".join(sys.argv[1:])) print sock.recv()
完成编辑后,按 CTRL+X 然后按 Y 保存并退出。
注意: 使用 ZeroMQ 库时,请记住每个线程用于发送消息(即 .send(..)
) 期望 .recv(..)
跟随。 未能实现该对将导致异常。
用法
我们的 server.py
设置为“回声”应用程序。 无论我们选择发送给它什么,它都会将它发送回来(例如 “回声: 消息”)。
使用 Python 解释器运行服务器:
python server.py
在另一个窗口中,使用客户端应用程序发送消息:
python client.py hello world! # Echo: hello world!
注: 要关闭服务器,可以使用组合键:Ctrl+C
使用发布/订阅模式
在发布/订阅模式的情况下,ZeroMQ 用于建立一个或多个订阅者,连接到一个或多个发布者并持续接收发布者发送的内容(或 seeds)。
此模式可以选择指定前缀以仅接受以它开头的此类消息。
用例:
发布/订阅模式用于在各个消费者之间均匀分布消息。 记分牌和新闻的自动更新可被视为使用此解决方案的可能领域。
使用的套接字类型:
- zmq.PUB
- zmq.SUB
发布者示例:pub.py
使用 nano (nano pub.py
) 创建一个“pub.py”并粘贴以下内容。
import zmq import time # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" sock = context.socket(zmq.PUB) sock.bind("tcp://127.0.0.1:5680") id = 0 while True: time.sleep(1) id, now = id+1, time.ctime() # Message [prefix][message] message = "1-Update! >> #{id} >> {time}".format(id=id, time=now) sock.send(message) # Message [prefix][message] message = "2-Update! >> #{id} >> {time}".format(id=id, time=now) sock.send(message) id += 1
完成编辑后,按 CTRL+X 然后按 Y 保存并退出。
订阅者示例:sub.py
使用 nano (nano sub.py
) 创建一个“sub.py”并粘贴以下内容。
import zmq # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" sock = context.socket(zmq.SUB) # Define subscription and messages with prefix to accept. sock.setsockopt(zmq.SUBSCRIBE, "1") sock.connect("tcp://127.0.0.1:5680") while True: message= sock.recv() print message
完成编辑后,按 CTRL+X 然后按 Y 保存并退出。
注意: 使用 .setsockopt(..)
过程,我们订阅接收以 string 1
开头的消息。 要接收全部,请不要设置(即 ""
)。
用法
我们的 pub.py
设置为 publisher,同时发送两条不同的消息,用于不同的订阅者。
运行发布者发送消息:
python pub.py
在另一个窗口中,查看订阅内容的打印输出(即 1
):
python sub.py! # 1-Update! >> 1 >> Wed Dec 25 17:23:56 2013
注意: 要关闭订阅者和发布者应用程序,可以使用组合键:Ctrl+C
流水线化 Pub./Sub。 使用管道模式(推/拉)
与 Publish/Subscribe 模式的外观非常相似,第三种管道模式是针对不同类型问题的解决方案:按需分发消息。
用例:
流水线模式可用于需要路由排队项目列表的情况(即 pushed in line)用于请求它的人(即 拉的人)。
使用的套接字类型:
- zmq.PUSH
- zmq.PULL
PUSH 示例:manager.py
使用 nano (nano manager.py
) 创建一个“manager.py”并粘贴以下内容。
import zmq import time # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" sock = context.socket(zmq.PUSH) sock.bind("tcp://127.0.0.1:5690") id = 0 while True: time.sleep(1) id, now = id+1, time.ctime() # Message [id] - [message] message = "{id} - {time}".format(id=id, time=now) sock.send(message) print "Sent: {msg}".format(msg=message)
文件 manager.py
将充当 任务分配器 。
拉示例:worker_1.py
使用 nano (nano worker_1.py
) 创建一个“worker_1.py”并粘贴以下内容。
导入zmq
# ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" sock = context.socket(zmq.PULL) sock.connect("tcp://127.0.0.1:5690") while True: message = sock.recv() print "Received: {msg}".format(msg=message)
文件 worker_1.py
将充当 任务进程 (消费者/工人)。
用法
我们的 manager.py
被设置为具有任务分配器的角色(即 经理),**PUSH**ing 项目。 同样,worker_1.py
设置为作为 worker 实例工作,当它通过 **PULL** 向下处理列表完成处理时。
运行发布者发送消息:
python manager.py
在另一个窗口中,查看订阅内容的打印输出(即 1
):
python worker_1.py! # 1-Update! >> 1 >> Wed Dec 25 17:23:56 2013
注意: 要关闭订阅者和发布者应用程序,可以使用组合键:Ctrl+C
独家配对模式
独占对模式意味着并允许使用 zmq/PAIR
套接字类型建立单音类型的通信通道。
绑定示例:bind.py
使用 nano (nano bind.py
) 创建一个“bind.py”并粘贴以下内容。
import zmq # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" socket = context.socket(zmq.PAIR) socket.bind("tcp://127.0.0.1:5696")
完成编辑后,按 CTRL+X 然后按 Y 保存并退出。
连接示例:connect.py
使用 nano (nano connect.py
) 创建一个“connect.py”并粘贴以下内容。
import zmq # ZeroMQ Context context = zmq.Context() # Define the socket using the "Context" socket = context.socket(zmq.PAIR) socket.connect("tcp://127.0.0.1:5696")
完成编辑后,按 CTRL+X 然后按 Y 保存并退出。
用法
您可以使用上面的示例创建任何双向单连接通信应用程序。
注: 任一关闭,都可以使用组合键:Ctrl+C