##ActiveMQ 嵌入Tomcat
在一些项目中,单独开启一个ActiveMQ,对于项目实施来说有时略显繁琐。所以我们将ActiveMQ内嵌到Tomcat,Tomcat启动同时就顺带启动了ActiveMQ。由此我们需要掌握三个个重要的知识点
在pom.xml添加ActiveMQ依赖,本次代码实例采用5.7版本,记住只需要activemq-core
就行。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
在编写BrokerService
代码部分,主要注意三个点
jconsole
中显示监控信息 broker.setUseJmx(true)
所以需要启动一个连接地址 tcp://localhost:61616
,用户名为admin
,密码为admin
,需要持久化,持久化数据文件存储地址为 /activemq
,需要启动jconsole
监控的BrokerService的代码如下:
// author:herbert qq:464884492
BrokerService broker = new BrokerService();
broker.setUseJmx(true); // 开启监控
broker.setPersistent(true); // 持久化
broker.setBrokerName("Test");
SimpleAuthenticationPlugin sap = new SimpleAuthenticationPlugin();
AuthenticationUser au = new AuthenticationUser("admin", "admin","users");
ArrayList<AuthenticationUser> d = new ArrayList<AuthenticationUser>();
d.add(au);
sap.setUsers(d); // 用户验证
broker.setPlugins(new BrokerPlugin[] { sap });
String mqDataPath = "/activemq"; // 存储位置
broker.getPersistenceAdapter().setDirectory(new File(mqDataPath));
broker.addConnector("tcp://localhost:61616"); // 连接地址
broker.start();
ActiveMQ中,通用的消息传递方式有两种
不管是生产者还是消费者代码编写,主要是4个步骤
failover:()
方式,自动断线重连Session
,获取发送或接收目标Destination
,指定是队列(session.createQueue(queueName)
),还是主题(session.createTopic(topicName)
)Session
获取生产者或消费者我们现在编写一个生产者的代码,并循环产生10条消息
// author:herbert qq:464884492
String mqConnUrl = "tcp://localhost:61616";
String connUrl = "failover:(" + mqConnUrl.trim()+ ")?initialReconnectDelay=1000&maxReconnectDelay=30000";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin", connUrl);
javax.jms.Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("system");
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
javax.jms.TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
System.out.println("发送消息:" + "ActiveMQ 发送的消息" + i);
messageProducer.send(message);
}
编写一个消费,消费上边的10条消息
// author:herbert qq:464884492
String mqConnUrl = "tcp://localhost:61616";
String connUrl = "failover:(" + mqConnUrl.trim()+ ")?initialReconnectDelay=1000&maxReconnectDelay=30000";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", connUrl);
javax.jms.Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("system");
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
ActiveMQTextMessage m = (ActiveMQTextMessage) message;
try {
System.out.println("接收到:" + m.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
运行效果
可见,我们生产者,产生的10条消息,已成功被消费者处理了。
对于嵌入的ActiveMQ,在BrokerService启动前需要设置 broker.setUseJmx(true);然后找到你的JAVA_HOME,切换到bin,输入jconsole命令。
待jconsole启动后,选择ActiveMQ所在的进程。连接后选择Mbean页签
对于Tomcat7.x版本之后Tomcat,Selvelt都可以通过直接在代码中通过注解的方式配置URl连接,一起是否自启动loadOnStartup
这个值>=0表示需要自启动,值越小优先级越高
// author:herbert qq:464884492
@WebServlet(urlPatterns = "/initmq", loadOnStartup = 1)
public class InitMqServlet extends HttpServlet {
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
// 这里编写启动ActiveMQ代码
}
}
这次以ActiveMQ作为消息队列使用切入点,总体上说还比较顺利。其中唯一出现问题的地方就是对于activeMQ依赖过多,多依赖了jaractivemq-broker
,导致消息能连接,但不能发送消息。后边直接换成 activemq-all
,有出现slf4j日志冲突,使用exclusions
依然不能解决问题。最终只依赖 activemq-core
,完美解决所有问题。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。