基于MQTT的封装库,实现点对点消息通讯:
Req-Rep
的请求-响应模式;Pub-Sub
的发布-订阅模式
NextMQTT在Topic设计上,已经区分不同服务之间和不同客户端之间的消息路由。基于MQTT的Topic分层设计,NextMQTT设计的Topic各个分层类目的规则说明如下:
/${string:domain}/${string:target-node-id}/${string:pattern-type}/${long:request-id}/${string:sender-node-id}/${string:tag}
domain
String类型值,消息目标服务名称。target-node-id
String类型值,消息目标的客户端节点ID。pattern-type
String类型值,模式类型。目前有以下三个模式值:
requests
请求消息Request的模式值;replies
响应消息Reply的模式值;pubsub
通知类消息PubSub的模式值;request-id
Long类型值,在Request和Reply模式中,request-id是一个基于时间变化的Long类型ID。在PubSub模式中为固定值,默认为0。sender-node-id
String类型值,发送消息来源的终端客户端节点ID;tag
String类型值,用于扩展:可以实现基于Topic实现一级消息路由,无须在消息payload层次增加路由标识。NextMQTT在Topic设计上区分每个终端的域,由domain
和node-id
组成。
Topic规则上的/${domain}/${node-id}
路径是发送消息给指定终端的重要规则。由此,NextMQTT内置实现以下二种模式。
Req-Rep模式
类似HTTP协议,由一个终端作为客户端,另一终端作为服务端,实现请求-响应
模式。
在Topic规则上,使用${string:pattern-type}
段来区分Request和Reply消息。
客户端向服务端发送消息,是通过作为客户端一方,向指定
node-id
的服务端发送request消息。 而作为服务端一方,监听发送给自己的request消息。
第一步:服务端监听Request消息
服务端监听发送给自己的request消息。对应地,终端订阅消息Topic为:
/${domain}/${
SERVER
.node-id}/requests
/#
第二步:客户端发送Request消息
客户端向服务端发送Request消息,需要知道服务端的NodeId。其发送Request消息的Topic为:
/${domain}/${
SERVER
.node-id}/requests
/${request-id}/${CLIENT
.node-id}/${tag}
客户端监听发送给自己的replies消息。 服务端返回响应时,根据收到的Request消息,生成Reply消息,发送消息给客户端。
第一步:客户端监听Reply消息
接收响应的客户端,订阅消息Topic为:
/${domain}/${
THIS
.node-id}/replies
/#
第二步:服务端发送Reply消息
服务端发送Reply消息的Topic为:
/${domain}/${
CLIENT
.node-id}/replies
/${request-id}/${THIS
.node-id}/${tag}
演示代码:
final MQTTSocket server = MQTTSocket.context()
.domain("next-mqtt")
.nodeId("SERVER")
.address("tcp://iot.eclipse.org:1883")
.socket();
final MQTTSocket client = MQTTSocket.context()
.domain("next-mqtt")
.nodeId("CLIENT")
.address("tcp://iot.eclipse.org:1883")
.socket();
server.connect();
client.connect();
// 订阅发送给本服务端终端的Request消息,其订阅Topic为 "/next-mqtt/SERVER/requests/#"
final long reqrepId = server.addRequestMessageHandler(new MessageHandler() {
@Override
public void onMessage(MQTTSocket socket, Message request) {
// 返回响应消息给客户端终端。发送消息的主题为 "/next-mqtt/CLIENT/replies/${req-id}/SERVER" 的消息
server.send(server.newReplyMessageOf(request, request.payload));
}
});
// 发送给服务端,并同步接收响应消息
System.out.println("客户端收到同步响应消息: " +
client.sendCall(client.newRequestMessageFor("SERVER", "ECHO-SYNC".getBytes()).builder()
.tag("ping-tag")
.build()).execute());
// 发送给服务端,并异步接收响应消息
final Latched<Message> reply = new Latched<>();
client.sendCall(client.newRequestMessageFor("SERVER", "ECHO-ASYNC".getBytes())).enqueue(new MessageCallback() {
@Override
public void onError(Exception error) {
error.printStackTrace();
reply.set(null);
}
@Override
public void onMessage(MQTTSocket socket, Message message) {
reply.set(message);
}
});
System.out.println("客户端收到异步响应消息: " + reply.get());
server.removeRequestMessageHandler(reqrepId);
client.disconnect();
server.disconnect();
Pub-Sub模式为事件广播模式,由客户端向服务端终端发布事件广播消息,服务端订阅消息并不作响应。
服务端终端订阅消息的Topic为:
/${domain}/${
SERVER
.node-id}/pubsub
/#
作为发送事件消息的客户端,其发送事件消息的Topic为:
/${domain}/${
SERVER
.node-id}/pubsub
/0/${CLIENT
.node-id}/${tag}
需要说明的是,Pub-Sub消息的request-id是固定值,默认为为0
。
在Pub-Sub模式中,客户端只发送事件消息,不等待响应;服务端只处理事件消息,不作响应。
演示代码:
final MQTTSocket server = MQTTSocket.context()
.domain("next-mqtt")
.nodeId("SERVER")
.address("tcp://iot.eclipse.org:1883")
.socket();
final MQTTSocket client = MQTTSocket.context()
.domain("next-mqtt")
.nodeId("CLIENT")
.address("tcp://iot.eclipse.org:1883")
.socket();
server.connect();
client.connect();
// 订阅指定发送给本服务端终端的PubSub类消息
final long pubsubId = server.addSubMessageHandler(new MessageHandler() {
@Override
public void onMessage(MQTTSocket socket, Message receivedMessage) {
System.out.println("服务端收到Pub消息:" + receivedMessage);
}
});
// 客户端终端向指定NodeId的服务端发送Pub消息可以实现两个终端之间的点对点广播通讯。
client.send(client.newPubMessageFor(NODE_SERVER, "YOOJIA".getBytes()).builder()
.tag("fun-tag") // 扩展用:可以基于Topic的tag来实现消息路由
.build());
client.send(client.newPubMessageFor(NODE_SERVER, "CHEN".getBytes()));
server.removeSubMessageHandler(pubsubId);
client.disconnect();
server.disconnect();
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型