代码拉取完成,页面将自动刷新
延迟和定时消息投递
在activemq.xml
文件里激活schedulerSupport
的属性:
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟消息投递时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMQ_SCHEDULED_CRON | String | 使用cron表达式 |
配置schedulerSupport=true
属性:
代码
生产者
public class JmsProducerDelayAndSheduler {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
int delay = 5000; //延迟投递时间
int period = 3000; //重复投递间隔时间
int repeat = 3; //重复投递次数
for (int i = 1; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("delay msg:" + UUID.randomUUID().toString());
// 通过前面了解的消息属性来设置 延迟投递和定时投递
// 注意 message设置的属性类型要对应上,Long, Int
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
// 投递次数的用setIntProperty
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息发送到MQ完成 **** ");
}
}
消费者
public class JmsConsumerDelayAndScheduler {
// 代码同正常的消费者代码
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("**** 消费者接收到消息 ****:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); // 必须加行代码,不然程序会直接往下执行结束了
messageConsumer.close();
session.close();
connection.close();
System.out.println("**** 消费者消费消息完成 ****");
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。