From 2351b70c44d3f20e23dd32d4eb1c8fa082ba525a Mon Sep 17 00:00:00 2001 From: wbihua <13176965+wbihua@user.noreply.gitee.com> Date: Fri, 21 Jul 2023 15:10:44 +0800 Subject: [PATCH] =?UTF-8?q?BUG=E4=BF=AE=E6=94=B9=201.=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E6=8F=8F=E8=BF=B0=EF=BC=9A=20mqtt=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E4=B8=BB=E5=8A=A8=E5=85=B3=E9=97=AD=EF=BC=8C=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=AB=AF=E6=B2=A1=E6=9C=89=E6=94=B6=E5=88=B0=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E6=88=96=E6=96=AD=E5=BC=80=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=EF=BC=8C=E5=AF=BC=E8=87=B4=E6=9C=8D=E5=8A=A1=E7=AB=AF=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E8=8E=B7=E5=8F=96=E5=AE=A2=E6=88=B7=E7=AB=AF=E7=9A=84?= =?UTF-8?q?=E7=8A=B6=E6=80=81=202.=E8=A7=A3=E5=86=B3=E6=96=B9=E6=A1=88?= =?UTF-8?q?=EF=BC=9A=20io.github.quickmsg.core.protocol.ConnectProtoco?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=A4=E5=A4=84=EF=BC=9A=201.registry=20un?= =?UTF-8?q?read=20event=20close=20channel=202.registry=20close=20mqtt=20ch?= =?UTF-8?q?annel=20event?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../quickmsg/core/protocol/ConnectProtocol.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 2b2ee99a..9c0d3507 100644 --- a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -1,5 +1,11 @@ package io.github.quickmsg.core.protocol; +import java.util.ArrayList; +import java.util.Date; +import java.util.Optional; + +import org.apache.commons.lang3.time.DateFormatUtils; + import io.github.quickmsg.common.auth.AuthManager; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.context.ReceiveContext; @@ -11,6 +17,7 @@ import io.github.quickmsg.common.log.LogEvent; import io.github.quickmsg.common.log.LogManager; import io.github.quickmsg.common.log.LogStatus; import io.github.quickmsg.common.message.mqtt.ConnectMessage; +import io.github.quickmsg.common.message.mqtt.DisConnectMessage; import io.github.quickmsg.common.metric.CounterType; import io.github.quickmsg.common.protocol.Protocol; import io.github.quickmsg.common.utils.JacksonUtil; @@ -20,13 +27,8 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.time.DateFormatUtils; import reactor.util.context.ContextView; -import java.util.ArrayList; -import java.util.Date; -import java.util.Optional; - /** * @author luxurong */ @@ -86,6 +88,7 @@ public class ConnectProtocol implements Protocol { } private void logHeartClose(LogManager logManager, MqttChannel mqttChannel) { + mqttChannel.close(); logManager.printInfo(mqttChannel, LogEvent.HEART_TIMEOUT, LogStatus.SUCCESS, JacksonUtil.bean2Json(mqttChannel.getConnectCache())); } @@ -101,6 +104,8 @@ public class ConnectProtocol implements Protocol { IntegrateTopics topics = mqttReceiveContext.getIntegrate().getTopics(); topics.removeTopic(mqttChannel, new ArrayList<>(mqttChannel.getTopics())); mqttReceiveContext.getRetryManager().clearRetry(mqttChannel); + DisConnectMessage disConnectMessage = new DisConnectMessage(mqttChannel); + mqttReceiveContext.getIntegrate().getProtocolAdaptor().chooseProtocol(disConnectMessage); Optional.ofNullable(mqttChannel.getConnectCache().getWill()).ifPresent(will -> Optional.ofNullable(topics.getMqttChannelsByTopic(will.getWillTopic())) .ifPresent(subscribeTopics -> subscribeTopics.forEach(subscribeTopic -> { -- Gitee