1 Star 0 Fork 0

le1024 / le1024

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
18_activemq可持久化机制-JDBC代码实现.md 9.62 KB
一键复制 编辑 原始数据 按行查看 历史
le1024 提交于 2022-04-28 10:27 . m

编码测试

一定要开启持久化!!!

messageProducer.setDeliverMode(DeliveryMode.PERSISTENT);

队列

生产者

public class JmsProduceJDBC {
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws Exception {
        //1.按照给定的url创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,ACTIVEMQ_URL);
        // 2.通过工厂连接connection 和启动
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3.启动
        connection.start();
        // 4.创建会话session
        //两个参数,第一个事务,第二个签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.创建目的地,队列、主题,这里用队列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);

        /**
         * 持久化必须设置,一定要开启!!!
         */
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 7.通过MessageProducer生产3条消息发送到消息队列中
        for (int i = 1; i <= 6; i++) {
            //8.创建消息
            TextMessage textMessage = session.createTextMessage("msg:" + LocalDateTime.now());
            //9.发送消息
            messageProducer.send(textMessage);
        }

        // 10.关闭资源
        messageProducer.close();
        session.close();
        connection.close();

        System.out.println(" **** 消息发送到MQ完成 **** ");
    }
}

生产6条消息:

image-20210818103613062

在数据库ACTIVEMQ_MSGS表中,会生成6条数据,就是上一步生产的消息

image-20210818103737289

消费者

public class JmsConsumerJDBC {
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        // 创建连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //开启连接
        connection.start();
        //创建会话session
        Session session = connection.createSession(false,  Session.AUTO_ACKNOWLEDGE);
        //创建队列,同生产者一致
        Queue queue = session.createQueue(QUEUE_NAME);
        //创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        /**
         * 方法2:通过监听器的方式
         */
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("**** 消费者接收到消息 ****:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read(); // 必须加行代码,不然程序会直接往下执行结束了
        messageConsumer.close();
        session.close();
        connection.close();
        System.out.println("**** 消费者消费消息完成 ****");


    }
}

启动消费者,会消息掉已生产的消息,mq控制台和数据库数据都会消费

image-20210818104137767

image-20210818104159631

队列消费总结:

  • 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
  • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中

队列中的消息一旦被consumer消费就从Broker中删除

主题

一定是先启动消费者订阅主题

消费者

public class JmsConsumerTopicJDBC {

    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String TOPIC_NAME = "topic-jdbc";

    public static void main(String[] args) throws Exception {
        /**
         * 持久化主题消息订阅,类似于微信公众号订阅
         * 需要先启动消费者,订阅上主题之后,后续生产主题消息,消费者(订阅者)就会接收到消息
         * 消费者(订阅者)订阅主题之后,不管是在线还是离线状态,只要保持正常订阅状态,期间生产的消息都会接收到。离线的会在再次在线后接收到之前的消息
         */
        System.out.println("jdbc-1"); //模拟订阅用户
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("jdbc-1"); // 设置clientId,表明订阅者

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "jdbc-1");
        connection.start();

        Message message = topicSubscriber.receive();
        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到的持久化topic消息:" + textMessage.getText());
            message = topicSubscriber.receive();
         }

        session.close();
        connection.close();
    }
}

启动消费者:

image-20210818122452450 image-20210818122440212

查看数据库,ACTIVEMQ_ACKS表中新增一条记录,为当前订阅者的信息

image-20210818122600951

生产者

public class JmsProduceTopicJDBC {
    public static final String ACTIVEMQ_URL = "tcp://localhost61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String TOPIC_NAME = "topic-jdbc";


    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);
        // connection启动之前必须设置持久化主题
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        connection.start();

        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("jdbc-msg:" + i);
            messageProducer.send(textMessage);
        }

        messageProducer.close();
        session.close();
        connection.close();
        System.out.println(" **** 持久化消息发送到MQ完成 **** ");
    }
}

启动生产者:

image-20210818122800439

image-20210818122820240

查看数据库:ACTIVEMQ_MSGS会新增消费的数据,ACTIVEMQ_ACKSLAST_ACKED_ID会更新为最后消费消息的ID

ACTIVEMQ_MSGS里的topic消息在消费后是不会删除的,而queue在消费后自动删除

image-20210818122946218

image-20210818123130574

小总结

  • queue

    生产的消息在没有消费的情况下,消息会存在activemq_msgs表中,只要任意一个消费者消费这些消息后,这些消息就会立即删除

  • topic

    一般是先启动消费者订阅之后,再通过生产者生产消息,之后消息也会存在activemq_msgs表中,activemq_acks表存的是消费者订阅信息

  • 开发注意事项

    1.mysql驱动包(或者其他数据库)和对应的数据库连接池jar包需要放到activemq目录下的lib中

    2.初次配置完成,数据库生成表之后,activemq.xml中配置createTablesOnStartup=false

    3.BeanFactory not initialized or already closed异常

    ​ 将操作系统的机器名带有的"_"符号去掉,重启操作系统

1
https://gitee.com/le1024/le1024.git
git@gitee.com:le1024/le1024.git
le1024
le1024
le1024
master

搜索帮助