1 Star 0 Fork 0

le1024 / le1024

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
4_消息持久化和非持久化.md 2.72 KB
一键复制 编辑 原始数据 按行查看 历史
le1024 提交于 2021-09-30 17:34 . redis

消息的可靠性

分为持久化和非持久化

通过setDeliveryMode设置

messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //非持久化:当服务器宕机,重启后消息会丢失

messageProducer.setDeliverMode(DeliveryMode.PERSISTENT); //持久化:当服务器宕机,重启后消息依然存在

持久queue

配置setDeliveryMode

持久topic

消费者

/**
         * 持久化主题消息订阅,类似于微信公众号订阅
         * 需要先启动消费者,订阅上主题之后,后续生产主题消息,消费者(订阅者)就会接收到消息
         * 消费者(订阅者)订阅主题之后,不管是在线还是离线状态,只要保持正常订阅状态,期间生产的消息都会接收到。离线的会在再次在线后接收到之前的消息
         */
        System.out.println("consumer-1"); //模拟订阅用户
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("consumer-1"); // 设置clientId,表明订阅者

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "订阅用户:consumer-1");
        connection.start();

        Message message = topicSubscriber.receive();
        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到的持久化topic消息:" + textMessage.getText());
            message = topicSubscriber.receive();
         }

        session.close();
        connection.close();

生产者

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);
        // connection启动之前必须设置持久化主题
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        connection.start();

        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("persist-msg:" + i);
            messageProducer.send(textMessage);
        }

        messageProducer.close();
        session.close();
        connection.close();
        System.out.println(" **** 持久化消息发送到MQ完成 **** ");
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/le1024/le1024.git
git@gitee.com:le1024/le1024.git
le1024
le1024
le1024
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891