日志收集:系统日志不是主体逻辑,属于辅助性功能,日志系统即使挂了也不能影响主业务逻辑,所以需要单独处理;
异步处理:对非实时性功能采用异步处理,例如系统需要发送优惠消息给客户,那么可以采用异步推送;
异步解耦:两个系统对接,可以采用实时接口调用,也可以采用MQ中间层解耦;
流量消费:在流量高峰时期将待处理内容发送到MQ,后台消费服务平滑处理,避免实时高峰流量造成系统崩溃,达到削峰填谷的目的;
具体可以参考网络,先安装erlang,再安装RabbitMQ:https://blog.csdn.net/hzw19920329/article/details/53156015
默认用户名、密码:guest
添加用户
virtual hosts管理
virtual hosts 相当于mysql 的 db
一般以/开头,然后对用户授权
可以看大授权后用户有对该virtual的权限
一共6种:简单队列模式、工作队列模式、发布订阅模式、路由模式、主题模式、RPC模式
模型:一对一的模型,即一个生产者发送消息到一个队列,一个消费者监听队列进行消息消费处理P:消息的生产者 ;红色:队列 ;C:消息的消费者
代码示例:
添加Maven依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
连接工厂:
package com.example.simple;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* RabbitMQ连接工具类
*/
public class ConnectionUtils {
/**
* 获取RabbitMQ连接
*/
public static Connection getConnection() throws IOException, TimeoutException {
//定义连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务地址
connectionFactory.setHost("127.0.0.1");
//设置AMQP监听端口
connectionFactory.setPort(5672);
//设置vhost
connectionFactory.setVirtualHost("/example");
//用户名
connectionFactory.setUsername("admin");
//密码
connectionFactory.setPassword("admin");
return connectionFactory.newConnection();
}
}
生产者:
package com.example.simple;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 生产者:发布消息
*/
public class Producer {
//定义队列名称
private static final String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义要发送的消息
String msg = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("----发送了一条消息:" + msg);
//关闭资源连接
channel.close();
connection.close();
}
}
查看发送的消息:
消费者:
获取队列消息(旧方法):通过循环监听(严重浪费性能)
package com.example.simple;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 消费者:获取生产者发送的消息
*/
public class Consumer {
//获取消息的队列名称
private static final String QUEUE_NAME = "test_queue_name";
private static void oldMethod() throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//定义队列消费者 (3.* 方法使用,最新版已经废弃,要想使用需要降低maven相关版本)
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("****收到了一条消息:" + msg);
}
}
}
新API方法:利用监听器机制
//获取消息的队列名称
private static final String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("****收到了一条消息:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, consumer);
}
运行后便得到了消息:
控制台查看消息已经没有了:
缺点
耦合性高 ,生产者一一对应消费者对列名变更,要同时变更代码
模型:同一个队列多个消费者
定义多个消费者,如果每个消费者消费的消息都一样多,这叫做轮询分发(round-robin)
生产者:发送50个消息
package com.example.work;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 工作队列之轮询分发生产者
*/
public class Producer {
//定义队列名称
private static final String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int i = 0; i < 50; i++){
//定义要发送的消息
String msg = "Message [" + i + "]";
//发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("----发送了一条消息:" + msg);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//关闭资源连接
channel.close();
connection.close();
}
}
消费者1:
package com.example.work;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 工作队列之轮询分发消费者1
*/
public class Consumer1 {//获取消息的队列名称
private static final String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
}
}
};
//监听队列
boolean autoAck = true; //自动应答
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
消费者2:
package com.example.work;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作队列之轮询分发消费者2
*/
public class Consumer2 {//获取消息的队列名称
private static final String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
}
}
};
//监听队列
boolean autoAck = true; //自动应答
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
注意先启动两个消费者,不然先启动生产者发送消息,再启动消费者时候,第一个消费者启动完成了会直接把所有的消息都消费掉,导致观察不到轮询分发的现象。现在我们先启动了两个消费者等待消息,再启动生产者发送消息:
消费者1控制台输出:
消费者2控制台输出:
可以看到两个消费者依次消费消息,且保证两个消费的的数量公平性;
公平分发:采用手动应答的方式,即消费者处理完成通知队列处理完成,这样处理快的客户端可以分到更多的消息
生产者:
package com.example.workfair;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 工作队列之公平分发生产者
*/
public class Producer {
//定义队列名称
private static final String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//设置每次发送到队列的消息只有一个,需要等到消费者发送处理完的响应后才继续发送消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
for(int i = 0; i < 50; i++){
//定义要发送的消息
String msg = "Message [" + i + "]";
//发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("----发送了一条消息:" + msg);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//关闭资源连接
channel.close();
connection.close();
}
}
重点在于消息的再次发送等待前一个消费完成:
消费者1:
package com.example.workfair;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作队列之公平分发消费者1
*/
public class Consumer1 {//获取消息的队列名称
private static final String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//保证队列一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
//处理完成手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列
boolean autoAck = false; //自动应答关闭
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
消费者2:
package com.example.workfair;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 工作队列之公平分发消费者2
*/
public class Consumer2 {//获取消息的队列名称
private static final String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//保证队列一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
//处理完成手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列
boolean autoAck = false; //自动应答关闭
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
启动消费者再启动生产者后:
消费者1消费:
消费者2消费:
可以看到两个消费者消费消息并不是公平的,谁消费的快谁处理的消息就多;
boolean autoAck = true
自动确认模式:一旦rabbitmq将消息分发给消费者,就会从内存中删除;
缺点:如果杀死正在执行的消费者,就会丢失正在处理的消息;
boolean autoAck = false
手动确认模式:如果有一个消费者挂掉,就会交付给其他消费者;
rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理完成,你可以删除了,然后rabbitmq就会删除内存中的消息;
消息应答默认是打开的,但是如果rabbitmq的服务器挂了,消息会消失,所以需要持久化消息 消息持久化:
//声明队列
boolean durable = false;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
对于已经定义的队列queue,不允许重新定义;
模型
1、一个生产者,多个消费者;
2、每个消费者都有自己的队列;
3、生产者没有直接把消息发送到队列,而是发送到了交换机 转发器 exchange;
4、每个队列都要绑定到交换机上;
5、生产者发送的消息经过交换机 到达队列 就能实现 一个消息被多个消费者消费;
代码示例
生产者:
package com.example.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 订阅模式生产者:只负责把消息发送到交换机
*/
public class Producer {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_name";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //分发
//发送消息
String msg = "Hello Publish_Subscribe !";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("****发送了一条消息:" + msg);
//关闭资源连接
channel.close();
connection.close();
}
}
控制台查看交换机:
但是却不存在消息,因为消息已经丢失了,交换机是没有存储消息的能力的,只有队列queue能存储消息,所以我们需要消费者产生队列绑定到交换机exchange
消费者1:
package com.example.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 订阅模式消费者1:产生一个队列,绑定到交换机,获取消息
*/
public class Consumer1 {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_name";
//设置消息的队列名称,例如发送邮件的队列
private static final String QUEUE_NAME = "test_queue_email";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//保证队列一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
//处理完成手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列
boolean autoAck = false; //自动应答关闭
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
消费者2:
package com.example.ps;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 订阅模式消费者2:产生一个队列,绑定到交换机,获取消息
*/
public class Consumer2 {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_name";
//设置消息的队列名称,例如发送短信的队列
private static final String QUEUE_NAME = "test_queue_sms";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//保证队列一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
//处理完成手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列
boolean autoAck = false; //自动应答关闭
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
现在生产者发送一条消息试试:
消费者1和消费者2都接收到了这条消息:
转发器
Exchange(交换机 转发器):一方面接受生产者的消息,另一方面向队列推送消息匿名转发; 上面例子指定了fanout模式Fanout(不处理路由键);
Direct (处理路由键):将消息发送到指定的、匹配的队列相当于身份标识,根据标识匹配;
相当于一堆队列绑定到路由,路由发送消息并不是直接发送到所有队列,而是根据设置的匹配标识来将消息发送到指定队列;
代码示例
生产者:
package com.example.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 路由模式生产者
*/
public class Producer {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机,设置为路由模式
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送消息
String msg = "Hello Publish_Subscribe !";
//定义路由键
String routingKey = "info";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("****发送了一条消息:" + msg);
//关闭资源连接
channel.close();
connection.close();
}
}
消费者1:
package com.example.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 路由模式消费者1:产生一个队列,绑定到交换机,从队列中获取指定路由键类型的消息
*/
public class Consumer1 {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_direct";
//设置消息的队列名称,例如发送邮件的队列
private static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
String routingKey = "error";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //指定队列路由键
//保证队列一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
//处理完成手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列
boolean autoAck = false; //自动应答关闭
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
消费者2:
package com.example.routing;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 路由模式消费者2:产生一个队列,绑定到交换机,从队列中获取指定路由键类型的消息
*/
public class Consumer2 {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_direct";
//设置消息的队列名称,例如发送邮件的队列
private static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); //指定队列路由键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); //指定队列路由键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); //指定队列路由键
//保证队列一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
//处理完成手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列
boolean autoAck = false; //自动应答关闭
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
测试结果:
Producer生产者发送 error类型路由键的消息:
消费者1和消费者2都能接收到消息:
生产者发送一条info类型路由键消息:
只有消费者2能接收到:
因为消费者2设置接收的路由键类型是包含info的:
而消费者1只有error:
模型
Topic exchange :将路由键和某模式匹配(根据规则匹配查找对应的队列)
# 匹配一个或多个
* 匹配一个
例如 发送 goods.add.one ,goods.# 能匹配 goods.* 不能匹配,但是发送 goods.add 都能匹配;
相当于正则表达式匹配了;
代码示例
生产者:
package com.example.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 主题模式生产者:例如发布一个商品信息消息
*/
public class Producer {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机,设置为主题模式
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//发送消息
String msg = "Hello Topic !";
//发布主题消息
String type = "goods.delete";
channel.basicPublish(EXCHANGE_NAME, type, null, msg.getBytes());
System.out.println("****发送了一条消息:" + msg + " ;类型:" + type);
//关闭资源连接
channel.close();
connection.close();
}
}
消费者1:
package com.example.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 主题模式消费者1:产生一个队列,绑定到交换机,根据主题匹配规则接受消息
*/
public class Consumer1 {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_topic";
//设置消息的队列名称
private static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); //规则定义为接收goods. 所有类型消息
//保证队列一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
//处理完成手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列
boolean autoAck = false; //自动应答关闭
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
消费者2:
package com.example.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 主题模式消费者2:产生一个队列,绑定到交换机,根据主题匹配规则接受消息
*/
public class Consumer2 {
//定义交换机名称
private static final String EXCHANGE_NAME = "test_exchange_topic";
//设置消息的队列名称
private static final String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); //只接收类型为goods.add的消息
//保证队列一次只分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("****收到了一条消息:" + msg);
try {
//模拟业务耗时操作
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(msg + ":处理完成");
//处理完成手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列
boolean autoAck = false; //自动应答关闭
channel.basicConsume(QUEUE_NAME, autoAck,consumer);
}
}
生产者生产一条 goods.delete 的消息:
消费者1消费了消息:
因为消费者1设置的模式能够匹配发送的消息格式:
生产者生产一条goods.add的消息:
消费1和消费者2都消费到了消息:
消费者1的goods.#能够匹配上述两条消息,消费2的goods.add只能消费goods.add消息,所以能接收到第二条消息;
在rabbitmq中我们可以通过持久化数据解决rabbitmq服务器异常导致数据丢失问题;
问题:生产者将消息发送出去之后,消息到底有没有成功的到达rabbitmq服务器,默认情况下是不知道的; 解决方案:
两者方式:
I. AMQP 实现了事务机制
II. Confirm模式
基本操作
txSelect : 用户将当前channel设置成transaction模式;
txCommit : 用于提交事务;
txRollback : 回滚事务;
代码示例
生产者:
package com.example.tx;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 事务管理
*/
public class Producer {
//定义队列名称
private static final String QUEUE_NAME = "test_queue_transaction";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try{
//开启事务
channel.txSelect();
//定义要发送的消息
String msg = "Hello Transaction!";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("----发送了一条消息:" + msg);
//提交事务
channel.txCommit();
}catch (Exception e){
//事务回滚
channel.txRollback();
System.out.println("产生异常,消息未成功发送!");
}
//关闭资源连接
channel.close();
connection.close();
}
}
消费者:
package com.example.tx;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 简单队列消费者
*/
public class Consumer {
//获取消息的队列名称
private static final String QUEUE_NAME = "test_queue_transaction";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("****收到了一条消息:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, consumer);
}
}
我们先开启消费者等待消费,然后生产者发送消息,正常情况下生产者发送消息,消费者接收到消息;
但是期间产生了异常,不过我们用事务进行处理,保证消息并未发送出去:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。