1 Star 0 Fork 0

le1024 / le1024

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
23_activemq延迟投递和定时投递.md 4.33 KB
一键复制 编辑 原始数据 按行查看 历史
le1024 提交于 2022-04-28 10:27 . m

延迟和定时消息投递

activemq.xml文件里激活schedulerSupport的属性:

Property name type description
AMQ_SCHEDULED_DELAY long 延迟消息投递时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String 使用cron表达式

配置schedulerSupport=true属性:

image-20210821213419530

代码

生产者

public class JmsProducerDelayAndSheduler {
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);

        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = session.createQueue(QUEUE_NAME);

        MessageProducer messageProducer = session.createProducer(queue);

        int delay = 5000; //延迟投递时间
        int period = 3000; //重复投递间隔时间
        int repeat = 3; //重复投递次数

        for (int i = 1; i < 3; i++) {
            TextMessage textMessage = session.createTextMessage("delay msg:" + UUID.randomUUID().toString());

            // 通过前面了解的消息属性来设置 延迟投递和定时投递
            // 注意 message设置的属性类型要对应上,Long, Int
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            // 投递次数的用setIntProperty
            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);

            messageProducer.send(textMessage);
        }

        messageProducer.close();
        session.close();
        connection.close();

        System.out.println(" **** 消息发送到MQ完成 **** ");
    }
}

image-20210821221303528


消费者

public class JmsConsumerDelayAndScheduler {

    // 代码同正常的消费者代码

    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false,  Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("**** 消费者接收到消息 ****:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read(); // 必须加行代码,不然程序会直接往下执行结束了
        messageConsumer.close();
        session.close();
        connection.close();
        System.out.println("**** 消费者消费消息完成 ****");
    }
}

image-20210821221421295

马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/le1024/le1024.git
git@gitee.com:le1024/le1024.git
le1024
le1024
le1024
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891