编码测试
一定要开启持久化!!!
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT);
生产者
public class JmsProduceJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
//1.按照给定的url创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,ACTIVEMQ_URL);
// 2.通过工厂连接connection 和启动
Connection connection = activeMQConnectionFactory.createConnection();
// 3.启动
connection.start();
// 4.创建会话session
//两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建目的地,队列、主题,这里用队列
Queue queue = session.createQueue(QUEUE_NAME);
// 6.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
/**
* 持久化必须设置,一定要开启!!!
*/
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7.通过MessageProducer生产3条消息发送到消息队列中
for (int i = 1; i <= 6; i++) {
//8.创建消息
TextMessage textMessage = session.createTextMessage("msg:" + LocalDateTime.now());
//9.发送消息
messageProducer.send(textMessage);
}
// 10.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息发送到MQ完成 **** ");
}
}
生产6条消息:
在数据库ACTIVEMQ_MSGS
表中,会生成6条数据,就是上一步生产的消息
消费者
public class JmsConsumerJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
//创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
// 创建连接connection
Connection connection = activeMQConnectionFactory.createConnection();
//开启连接
connection.start();
//创建会话session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列,同生产者一致
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/**
* 方法2:通过监听器的方式
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("**** 消费者接收到消息 ****:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); // 必须加行代码,不然程序会直接往下执行结束了
messageConsumer.close();
session.close();
connection.close();
System.out.println("**** 消费者消费消息完成 ****");
}
}
启动消费者,会消息掉已生产的消息,mq控制台和数据库数据都会消费
队列消费总结:
队列中的消息一旦被consumer消费就从Broker中删除
一定是先启动消费者订阅主题
消费者
public class JmsConsumerTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
/**
* 持久化主题消息订阅,类似于微信公众号订阅
* 需要先启动消费者,订阅上主题之后,后续生产主题消息,消费者(订阅者)就会接收到消息
* 消费者(订阅者)订阅主题之后,不管是在线还是离线状态,只要保持正常订阅状态,期间生产的消息都会接收到。离线的会在再次在线后接收到之前的消息
*/
System.out.println("jdbc-1"); //模拟订阅用户
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("jdbc-1"); // 设置clientId,表明订阅者
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "jdbc-1");
connection.start();
Message message = topicSubscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的持久化topic消息:" + textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
启动消费者:
查看数据库,ACTIVEMQ_ACKS
表中新增一条记录,为当前订阅者的信息
生产者
public class JmsProduceTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
// connection启动之前必须设置持久化主题
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("jdbc-msg:" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 持久化消息发送到MQ完成 **** ");
}
}
启动生产者:
查看数据库:ACTIVEMQ_MSGS
会新增消费的数据,ACTIVEMQ_ACKS
的LAST_ACKED_ID会更新为最后消费消息的ID
ACTIVEMQ_MSGS
里的topic消息在消费后是不会删除的,而queue在消费后自动删除
queue
生产的消息在没有消费的情况下,消息会存在activemq_msgs
表中,只要任意一个消费者消费这些消息后,这些消息就会立即删除
topic
一般是先启动消费者订阅之后,再通过生产者生产消息,之后消息也会存在activemq_msgs
表中,activemq_acks
表存的是消费者订阅信息
开发注意事项
1.mysql驱动包(或者其他数据库)和对应的数据库连接池jar包需要放到activemq目录下的lib中
2.初次配置完成,数据库生成表之后,activemq.xml中配置createTablesOnStartup=false
3.BeanFactory not initialized or already closed异常
将操作系统的机器名带有的"_"符号去掉,重启操作系统
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。