22 Star 40 Fork 9

bits-chen / NextMQTT

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Next-MQTT

基于MQTT的封装库,实现点对点消息通讯:

  • Req-Rep请求-响应模式;
  • Pub-Sub发布-订阅模式

一、Topic规则

NextMQTT在Topic设计上,已经区分不同服务之间和不同客户端之间的消息路由。基于MQTT的Topic分层设计,NextMQTT设计的Topic各个分层类目的规则说明如下:

/${string:domain}/${string:target-node-id}/${string:pattern-type}/${long:request-id}/${string:sender-node-id}/${string:tag}

  • domain String类型值,消息目标服务名称。
  • target-node-id String类型值,消息目标的客户端节点ID。
  • pattern-type String类型值,模式类型。目前有以下三个模式值:
    • requests 请求消息Request的模式值;
    • replies 响应消息Reply的模式值;
    • pubsub 通知类消息PubSub的模式值;
  • request-id Long类型值,在Request和Reply模式中,request-id是一个基于时间变化的Long类型ID。在PubSub模式中为固定值,默认为0。
  • sender-node-id String类型值,发送消息来源的终端客户端节点ID;
  • tag String类型值,用于扩展:可以实现基于Topic实现一级消息路由,无须在消息payload层次增加路由标识。

二、消息模式

NextMQTT在Topic设计上区分每个终端的域,由domainnode-id组成。

Topic规则上的/${domain}/${node-id}路径是发送消息给指定终端的重要规则。由此,NextMQTT内置实现以下二种模式。

2.1 Req-Rep 点对点通讯模式

Req-Rep模式类似HTTP协议,由一个终端作为客户端,另一终端作为服务端,实现请求-响应模式。 在Topic规则上,使用${string:pattern-type}段来区分Request和Reply消息。

客户端发送消息过程

客户端向服务端发送消息,是通过作为客户端一方,向指定node-id的服务端发送request消息。 而作为服务端一方,监听发送给自己的request消息。

第一步:服务端监听Request消息

服务端监听发送给自己的request消息。对应地,终端订阅消息Topic为:

/${domain}/${SERVER.node-id}/requests/#

第二步:客户端发送Request消息

客户端向服务端发送Request消息,需要知道服务端的NodeId。其发送Request消息的Topic为:

/${domain}/${SERVER.node-id}/requests/${request-id}/${CLIENT.node-id}/${tag}

服务端响应消息过程

客户端监听发送给自己的replies消息。 服务端返回响应时,根据收到的Request消息,生成Reply消息,发送消息给客户端。

第一步:客户端监听Reply消息

接收响应的客户端,订阅消息Topic为:

/${domain}/${THIS.node-id}/replies/#

第二步:服务端发送Reply消息

服务端发送Reply消息的Topic为:

/${domain}/${CLIENT.node-id}/replies/${request-id}/${THIS.node-id}/${tag}

演示代码:

final MQTTSocket server = MQTTSocket.context()
            .domain("next-mqtt")
            .nodeId("SERVER")
            .address("tcp://iot.eclipse.org:1883")
            .socket();

final MQTTSocket client = MQTTSocket.context()
            .domain("next-mqtt")
            .nodeId("CLIENT")
            .address("tcp://iot.eclipse.org:1883")
            .socket();

server.connect();
client.connect();

// 订阅发送给本服务端终端的Request消息,其订阅Topic为 "/next-mqtt/SERVER/requests/#"
final long reqrepId = server.addRequestMessageHandler(new MessageHandler() {
    @Override
    public void onMessage(MQTTSocket socket, Message request) {
        // 返回响应消息给客户端终端。发送消息的主题为 "/next-mqtt/CLIENT/replies/${req-id}/SERVER" 的消息
        server.send(server.newReplyMessageOf(request, request.payload));
    }
});

// 发送给服务端,并同步接收响应消息
System.out.println("客户端收到同步响应消息: " +
        client.sendCall(client.newRequestMessageFor("SERVER", "ECHO-SYNC".getBytes()).builder()
                .tag("ping-tag")
                .build()).execute());

// 发送给服务端,并异步接收响应消息
final Latched<Message> reply = new Latched<>();
client.sendCall(client.newRequestMessageFor("SERVER", "ECHO-ASYNC".getBytes())).enqueue(new MessageCallback() {
    @Override
    public void onError(Exception error) {
        error.printStackTrace();
        reply.set(null);
    }

    @Override
    public void onMessage(MQTTSocket socket, Message message) {
        reply.set(message);
    }
});

System.out.println("客户端收到异步响应消息: " + reply.get());

server.removeRequestMessageHandler(reqrepId);

client.disconnect();
server.disconnect();

2.2 Pub-Sub 点对点通讯模式

Pub-Sub模式为事件广播模式,由客户端向服务端终端发布事件广播消息,服务端订阅消息并不作响应。

服务端终端订阅消息的Topic为:

/${domain}/${SERVER.node-id}/pubsub/#

作为发送事件消息的客户端,其发送事件消息的Topic为:

/${domain}/${SERVER.node-id}/pubsub/0/${CLIENT.node-id}/${tag}

需要说明的是,Pub-Sub消息的request-id是固定值,默认为为0

在Pub-Sub模式中,客户端只发送事件消息,不等待响应;服务端只处理事件消息,不作响应。

演示代码:

final MQTTSocket server = MQTTSocket.context()
            .domain("next-mqtt")
            .nodeId("SERVER")
            .address("tcp://iot.eclipse.org:1883")
            .socket();

final MQTTSocket client = MQTTSocket.context()
            .domain("next-mqtt")
            .nodeId("CLIENT")
            .address("tcp://iot.eclipse.org:1883")
            .socket();

server.connect();
client.connect();

// 订阅指定发送给本服务端终端的PubSub类消息
final long pubsubId = server.addSubMessageHandler(new MessageHandler() {
    @Override
    public void onMessage(MQTTSocket socket, Message receivedMessage) {
        System.out.println("服务端收到Pub消息:" + receivedMessage);
    }
});

// 客户端终端向指定NodeId的服务端发送Pub消息可以实现两个终端之间的点对点广播通讯。
client.send(client.newPubMessageFor(NODE_SERVER, "YOOJIA".getBytes()).builder()
        .tag("fun-tag") // 扩展用:可以基于Topic的tag来实现消息路由
        .build());
client.send(client.newPubMessageFor(NODE_SERVER, "CHEN".getBytes()));
server.removeSubMessageHandler(pubsubId);

client.disconnect();
server.disconnect();

空文件

简介

基于MQTT的封装库,实现点对点消息通讯: Req-Rep的请求-响应模式; Pub-Sub的发布-订阅模式 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/bitschen/NextMQTT.git
git@gitee.com:bitschen/NextMQTT.git
bitschen
NextMQTT
NextMQTT
master

搜索帮助