时间:2024-12-30 12:00
人气:
作者:admin
在一些复杂的项目中,往往会由不同功能的程序组成,且在程序运行期间,各个程序还需要进行互相通信,实现进程间通信的方式有很多种,最常用的就是通过消息中间件,比如RabbitMQ,Kafka,以及ZeroMQ等,而RabbitMQ和Kafka这两款中间件往往都需要独立安装步骤才能使用,ZeroMQ却不需要独立安装部署,而是作为动态库直接在程序中引用即可。今天以一个简单的小例子,简述ZeroMQ的常见用法,仅供学习分享使用,如有不足之处,还请指正。

ZeroMQ (又被称为 ØMQ, 0MQ, or zmq),虽然看起来像是可嵌入的网络组件,实际上却是一款并发框架。ZeroMQ作为一款开源通用消息组件,通过Socket可以将原子消息通过不同协议(进程内,进程间,TCP和广播等)进行传输。基于ZeroMQ,可以由多种模式进行选择,如:fan-out,发布-订阅,任务分发,请求-应答等,并且支持1-N,N-N等多端通信。对于C#开发人员,ZeroMQ有两种方式可供选择,1. NetMQ,提供一个端口给C#;2. clrzmq4通过C#绑定到libzmq。而NetMQ正是ZeroMQ推荐的使用方式

ZeroMQ提供了多种通信模式,主要有以下几种:
本文主要讲解请求应答模式和发布订阅模式,其他通信模式,如果感兴趣可以参考官方文档。
请求应答(Request-Response),此模式是ZeroMQ所有通信方式中最简单的一种模式,当客户端发出请求时,期望得到应答,且必须得到应答,才算一个完整的通信。请求应答模式是同步阻塞模式,如果发送消息顺序错误,会抛出异常。正确的请求应答顺序如下:
请求应答模式,主要由RequestSocket和ResponseSocket组成,实现消息的请求和应答。
请求端发送消息之前,需要先进行连接Connect,然后才能发送消息。示例代码如下所示:
public class ZeroMQRequest:IDisposable
{
public Action<string> Received;
public Action<string> Sended;
private string url = string.Empty;
private RequestSocket request;
public ZeroMQRequest(string url)
{
this.request = new RequestSocket();
this.url = url;
}
public void Connect()
{
request.Connect(this.url);
}
public void BeginReceive()
{
string msg = this.request.ReceiveFrameString();
Received?.Invoke(msg);
}
public void SendMsg(string msg)
{
this.request.SendFrame(msg);
if (Sended != null)
{
Sended.Invoke(msg);
}
}
public void Disconnect()
{
request.Disconnect(this.url);
}
public void Dispose()
{
request.Close();
request.Dispose();
}
}
响应端接收消息之前,需要先进行绑定(Bind)到对应的网络端口,然后才能接收消息。示例代码如下所示:
public class ZeroMQResponse:IDisposable
{
public Action<string> Received;
public Action<string> Sended;
private string url = string.Empty;
private ResponseSocket response;
public ZeroMQResponse(string url)
{
this.url = url;
this.response = new ResponseSocket();
this.response.Bind(this.url);
}
public void BeginReceive()
{
Task.Run(() =>
{
while (true)
{
string msg = this.response.ReceiveFrameString();
Received?.Invoke(msg);
//收到回复
Send("Ok");
}
});
}
public void Send(string msg)
{
this.response.SendFrame(msg);
if (Sended != null)
{
Sended.Invoke(msg);
}
}
public void Dispose()
{
this.response.Dispose();
}
}
上述代码是将ReuqestSocket和ResponseSocket进行封装,并通过委托Action公开了接收和发送后响应接口,在使用时进行调用即可。
请求应答模式示例截图如下所示:

由于请求应答模式是阻塞模式,如果没有发送就调用接收方法,或连续调用接收方法,或连续发送(发送没有响应就再次发送),则会抛出异常。
连续两次发送,异常信息如下所示:

连续两次接收,异常信息如下所示:

发布订阅模式,将要发送的信息按照主题(Topic)进行分类,哪个接收端订阅了这个主题,就接收对应的消息,而不是直接发发送给接收者,这样有助于对消息进行分类处理。所以发布订阅模式并非阻塞模式,也不是一对一的请求响应,而是按需分类,异步响应模式。此模式主要有PublisherSocket和SubscriberSocket两个类,分别用于处理消息的发布和订阅。正确的发布订阅顺序,如下所示:
消息发布类(PublisherSocket),在消息发送之前,首先绑定一个端口,然后才能发送主题和消息,示例代码如下所示:
public class ZeroMQPublisher : IDisposable
{
private string url=string.Empty;
private PublisherSocket publisher;
public Action<string> Sended;
public ZeroMQPublisher(string url)
{
this.url = url;
this.publisher = new PublisherSocket();
this.publisher.Bind(url);
}
public void Send(string topic,string msg)
{
this.publisher.SendMoreFrame(topic);
this.publisher.SendFrame(msg);
if(Sended != null)
{
Sended.Invoke($"send msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
}
}
public void Dispose()
{
this.publisher.Close();
this.publisher.Dispose();
}
}
消息订阅类(SubscriberSocket),在消息接收之前,首先连接端口,订阅主题(Subscribe方法),然后才能进行消息的接收,示例代码如下所示:
public class ZeroMQSubscriber : IDisposable
{
private string url=string.Empty;
private SubscriberSocket subscriber;
public Action<string> Received;
private bool isRunning = false;
public ZeroMQSubscriber(string url)
{
this.url = url;
this.subscriber = new SubscriberSocket();
this.subscriber.Connect(url);
this.subscriber.Subscribe(string.Empty);
this.isRunning = true;
}
public void BeginReceive()
{
Task.Run(() =>
{
while(isRunning)
{
var topic = this.subscriber.ReceiveFrameString();
var msg = this.subscriber.ReceiveFrameString();
if(Received != null)
{
Received.Invoke($"received msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
}
}
});
}
public void DisConnect()
{
isRunning = false;
this.subscriber.Disconnect(this.url);
}
public void Dispose()
{
this.isRunning=false;
this.subscriber.Close();
this.subscriber?.Dispose();
}
}
注意,发布订阅模式是单向触发的,即消息发布者,不可以接收消息;消息接收者,也不可以发布消息。接收端在调用Subscribe方法时,如果主题为空,则表示可以订阅任何主题。
发布订阅模式示例截图如下所示:

关注老码识途公众号,回复关键字ZeroMQ,即可获取示例源码,如下图所示:

以上就是《进程间通信组件库ZeroMQ详解》的全部内容。
作者:老码识途
出处:http://www.cnblogs.com/hsiang/
本文版权归作者和博客园共有,写文不易,支持原创,欢迎转载【点赞】,转载请保留此段声明,且在文章页面明显位置给出原文连接,谢谢。
关注个人公众号,定时同步更新技术及职场文章