1 Star 0 Fork 0

珊瑚海SKY / RabbitMQ

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

1. 消息队列解决问题

  • 日志收集:系统日志不是主体逻辑,属于辅助性功能,日志系统即使挂了也不能影响主业务逻辑,所以需要单独处理;

  • 异步处理:对非实时性功能采用异步处理,例如系统需要发送优惠消息给客户,那么可以采用异步推送;

  • 异步解耦:两个系统对接,可以采用实时接口调用,也可以采用MQ中间层解耦;

  • 流量消费:在流量高峰时期将待处理内容发送到MQ,后台消费服务平滑处理,避免实时高峰流量造成系统崩溃,达到削峰填谷的目的;

2. RabbitMQ安装

  • 具体可以参考网络,先安装erlang,再安装RabbitMQ:https://blog.csdn.net/hzw19920329/article/details/53156015

  • 默认用户名、密码:guest

    1555119896515

  • 添加用户

    1555119759146

  • virtual hosts管理

    virtual hosts 相当于mysql 的 db

    1555120087927

    一般以/开头,然后对用户授权

    1555120125325

    可以看大授权后用户有对该virtual的权限

    1555120164045

3. RabbitMQ工作模式

3.1 工作模式简述

一共6种:简单队列模式、工作队列模式、发布订阅模式、路由模式、主题模式、RPC模式

1555120602211

1555120610824

3.2 简单队列

  • 模型:一对一的模型,即一个生产者发送消息到一个队列,一个消费者监听队列进行消息消费处理imgimgP:消息的生产者 ;红色:队列 ;C:消息的消费者

  • 代码示例:

    添加Maven依赖:

    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>5.2.0</version>
    </dependency>
    
    <dependency>
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
       <version>3.2.4</version>
    </dependency>
    
    <dependency>
       <groupId>io.micrometer</groupId>
       <artifactId>micrometer-core</artifactId>
       <version>1.0.0</version>
    </dependency>
    
    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
       <version>1.7.25</version>
    </dependency>

    连接工厂:

    package com.example.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * RabbitMQ连接工具类
     */
    public class ConnectionUtils {
        /**
         * 获取RabbitMQ连接
         */
        public static Connection getConnection() throws IOException, TimeoutException {
            //定义连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置服务地址
            connectionFactory.setHost("127.0.0.1");
            //设置AMQP监听端口
            connectionFactory.setPort(5672);
            //设置vhost
            connectionFactory.setVirtualHost("/example");
            //用户名
            connectionFactory.setUsername("admin");
            //密码
            connectionFactory.setPassword("admin");
            
            return connectionFactory.newConnection();
        }
    }

    生产者:

    package com.example.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * 生产者:发布消息
     */
    public class Producer {
        //定义队列名称
        private static final String QUEUE_NAME = "test_queue_name";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取一个连接
            Connection connection = ConnectionUtils.getConnection();
    
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
            //创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //定义要发送的消息
            String msg = "Hello RabbitMQ!";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("----发送了一条消息:" + msg);
    
            //关闭资源连接
            channel.close();
            connection.close();
        }
    }

    查看发送的消息:

    1555121773572

    1555121514373

    消费者:

    获取队列消息(旧方法):通过循环监听(严重浪费性能)

    package com.example.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 消费者:获取生产者发送的消息
     */
    public class Consumer {
        //获取消息的队列名称
        private static final String QUEUE_NAME = "test_queue_name";
    
        private static void oldMethod() throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
    
            //创建通道
            Channel channel = connection.createChannel();
    
            //定义队列消费者 (3.* 方法使用,最新版已经废弃,要想使用需要降低maven相关版本)
            QueueingConsumer consumer = new QueueingConsumer(channel);
    
            //监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true){
                QueueingConsumer.Delivery delivery =  consumer.nextDelivery();
                String msg = new String(delivery.getBody());
    
                System.out.println("****收到了一条消息:" + msg);
            }
        }
    }

    新API方法:利用监听器机制

    //获取消息的队列名称
        private static final String QUEUE_NAME = "test_queue_name";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
    
            //创建频道
            Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body);
                    System.out.println("****收到了一条消息:" + msg);
                }
            };
    
            //监听队列
            channel.basicConsume(QUEUE_NAME, consumer);
        }

    运行后便得到了消息:

    1555121730673

    控制台查看消息已经没有了:

    1555121792916

  • 缺点

    耦合性高 ,生产者一一对应消费者对列名变更,要同时变更代码

3.3 Work queues工作队列

  • 模型:同一个队列多个消费者

    1555122028961

3.3.1 轮询分发

定义多个消费者,如果每个消费者消费的消息都一样多,这叫做轮询分发(round-robin)

生产者:发送50个消息

package com.example.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 工作队列之轮询分发生产者
 */
public class Producer {
    //定义队列名称
    private static final String QUEUE_NAME = "test_queue_name";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();

        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for(int i = 0; i < 50; i++){
            //定义要发送的消息
            String msg = "Message [" + i + "]";
            //发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("----发送了一条消息:" + msg);

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //关闭资源连接
        channel.close();
        connection.close();
    }

}

消费者1:

package com.example.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 工作队列之轮询分发消费者1
 */
public class Consumer1 {//获取消息的队列名称
    private static final String QUEUE_NAME = "test_queue_name";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建频道
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("****收到了一条消息:" + msg);

                try {
                    //模拟业务耗时操作
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(msg + ":处理完成");
                }
            }
        };

        //监听队列
        boolean autoAck = true; //自动应答
        channel.basicConsume(QUEUE_NAME, autoAck,consumer);
    }
}

消费者2:

package com.example.work;

import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列之轮询分发消费者2
 */
public class Consumer2 {//获取消息的队列名称
    private static final String QUEUE_NAME = "test_queue_name";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建频道
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("****收到了一条消息:" + msg);

                try {
                    //模拟业务耗时操作
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(msg + ":处理完成");
                }
            }
        };

        //监听队列
        boolean autoAck = true; //自动应答
        channel.basicConsume(QUEUE_NAME, autoAck,consumer);
    }
}

注意先启动两个消费者,不然先启动生产者发送消息,再启动消费者时候,第一个消费者启动完成了会直接把所有的消息都消费掉,导致观察不到轮询分发的现象。现在我们先启动了两个消费者等待消息,再启动生产者发送消息:

1555122363508

消费者1控制台输出:

1555122663237

消费者2控制台输出:

1555122682785

可以看到两个消费者依次消费消息,且保证两个消费的的数量公平性;

3.3.2 公平分发

公平分发:采用手动应答的方式,即消费者处理完成通知队列处理完成,这样处理快的客户端可以分到更多的消息

生产者:

package com.example.workfair;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 工作队列之公平分发生产者
 */
public class Producer {
    //定义队列名称
    private static final String QUEUE_NAME = "test_queue_name";

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();

        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //设置每次发送到队列的消息只有一个,需要等到消费者发送处理完的响应后才继续发送消息
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

        for(int i = 0; i < 50; i++){
            //定义要发送的消息
            String msg = "Message [" + i + "]";
            //发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("----发送了一条消息:" + msg);

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //关闭资源连接
        channel.close();
        connection.close();
    }

}

重点在于消息的再次发送等待前一个消费完成:

1555126208038

消费者1:

package com.example.workfair;

import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列之公平分发消费者1
 */
public class Consumer1 {//获取消息的队列名称
    private static final String QUEUE_NAME = "test_queue_name";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        final Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //保证队列一次只分发一个
        channel.basicQos(1);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("****收到了一条消息:" + msg);

                try {
                    //模拟业务耗时操作
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(msg + ":处理完成");
                    //处理完成手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        //监听队列
        boolean autoAck = false; //自动应答关闭
        channel.basicConsume(QUEUE_NAME, autoAck,consumer);
    }
}

消费者2:

package com.example.workfair;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.simple.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 工作队列之公平分发消费者2
 */
public class Consumer2 {//获取消息的队列名称
    private static final String QUEUE_NAME = "test_queue_name";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建频道
        final Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //保证队列一次只分发一个
        channel.basicQos(1);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("****收到了一条消息:" + msg);

                try {
                    //模拟业务耗时操作
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(msg + ":处理完成");
                    //处理完成手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        //监听队列
        boolean autoAck = false; //自动应答关闭
        channel.basicConsume(QUEUE_NAME, autoAck,consumer);
    }
}

启动消费者再启动生产者后:

消费者1消费:

1555126386319

消费者2消费:

1555126403923

可以看到两个消费者消费消息并不是公平的,谁消费的快谁处理的消息就多;

3.4 消息应答ack与消息持久化durable

  • boolean autoAck = true

    自动确认模式:一旦rabbitmq将消息分发给消费者,就会从内存中删除;

    缺点:如果杀死正在执行的消费者,就会丢失正在处理的消息;

  • boolean autoAck = false

    手动确认模式:如果有一个消费者挂掉,就会交付给其他消费者;

    rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理完成,你可以删除了,然后rabbitmq就会删除内存中的消息;

  • 消息应答默认是打开的,但是如果rabbitmq的服务器挂了,消息会消失,所以需要持久化消息 消息持久化:

    //声明队列
    boolean durable = false;
    channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

    对于已经定义的队列queue,不允许重新定义;

3.5 publish_subscribe订阅模式

  • 模型

    1555140348050

    1、一个生产者,多个消费者;

    2、每个消费者都有自己的队列;

    3、生产者没有直接把消息发送到队列,而是发送到了交换机 转发器 exchange;

    4、每个队列都要绑定到交换机上;

    5、生产者发送的消息经过交换机 到达队列 就能实现 一个消息被多个消费者消费;

  • 代码示例

    生产者:

    package com.example.ps;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * 订阅模式生产者:只负责把消息发送到交换机
     */
    public class Producer {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_name";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取一个连接
            Connection connection = ConnectionUtils.getConnection();
    
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
    
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //分发
            //发送消息
            String msg = "Hello Publish_Subscribe !";
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
            System.out.println("****发送了一条消息:" + msg);
    
            //关闭资源连接
            channel.close();
            connection.close();
        }
    }

    控制台查看交换机:

    1555140629138

    1555140649068

    但是却不存在消息,因为消息已经丢失了,交换机是没有存储消息的能力的,只有队列queue能存储消息,所以我们需要消费者产生队列绑定到交换机exchange

    消费者1:

    package com.example.ps;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 订阅模式消费者1:产生一个队列,绑定到交换机,获取消息
     */
    public class Consumer1 {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_name";
        //设置消息的队列名称,例如发送邮件的队列
        private static final String QUEUE_NAME = "test_queue_email";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
            //创建频道
            final Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            //保证队列一次只分发一个
            channel.basicQos(1);
    
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("****收到了一条消息:" + msg);
    
                    try {
                        //模拟业务耗时操作
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println(msg + ":处理完成");
                        //处理完成手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            //监听队列
            boolean autoAck = false; //自动应答关闭
            channel.basicConsume(QUEUE_NAME, autoAck,consumer);
        }
    }

    消费者2:

    package com.example.ps;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 订阅模式消费者2:产生一个队列,绑定到交换机,获取消息
     */
    public class Consumer2 {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_name";
        //设置消息的队列名称,例如发送短信的队列
        private static final String QUEUE_NAME = "test_queue_sms";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
            //创建频道
            final Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            //保证队列一次只分发一个
            channel.basicQos(1);
    
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("****收到了一条消息:" + msg);
    
                    try {
                        //模拟业务耗时操作
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println(msg + ":处理完成");
                        //处理完成手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            //监听队列
            boolean autoAck = false; //自动应答关闭
            channel.basicConsume(QUEUE_NAME, autoAck,consumer);
        }
    }

    现在生产者发送一条消息试试:

    1555140774384

    消费者1和消费者2都接收到了这条消息:

    1555140822107

  • 转发器

    Exchange(交换机 转发器):一方面接受生产者的消息,另一方面向队列推送消息匿名转发; 上面例子指定了fanout模式Fanout(不处理路由键);

    1555140880454

3.6 路由模式

  • 模型

Direct (处理路由键):将消息发送到指定的、匹配的队列相当于身份标识,根据标识匹配;

相当于一堆队列绑定到路由,路由发送消息并不是直接发送到所有队列,而是根据设置的匹配标识来将消息发送到指定队列;

1555141110578

1555141117596

  • 代码示例

    生产者:

    package com.example.routing;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     *  路由模式生产者
     */
    public class Producer {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_direct";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取一个连接
            Connection connection = ConnectionUtils.getConnection();
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
            //声明交换机,设置为路由模式
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //发送消息
            String msg = "Hello Publish_Subscribe !";
            //定义路由键
            String routingKey = "info";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
            System.out.println("****发送了一条消息:" + msg);
    
            //关闭资源连接
            channel.close();
            connection.close();
        }
    
    }

    消费者1:

    package com.example.routing;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     *  路由模式消费者1:产生一个队列,绑定到交换机,从队列中获取指定路由键类型的消息
     */
    public class Consumer1 {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_direct";
        //设置消息的队列名称,例如发送邮件的队列
        private static final String QUEUE_NAME = "test_queue_direct_1";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
            //创建频道
            final Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //绑定队列到交换机
            String routingKey = "error";
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //指定队列路由键
    
            //保证队列一次只分发一个
            channel.basicQos(1);
    
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("****收到了一条消息:" + msg);
    
                    try {
                        //模拟业务耗时操作
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println(msg + ":处理完成");
                        //处理完成手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            //监听队列
            boolean autoAck = false; //自动应答关闭
            channel.basicConsume(QUEUE_NAME, autoAck,consumer);
        }
    }

    消费者2:

    package com.example.routing;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 路由模式消费者2:产生一个队列,绑定到交换机,从队列中获取指定路由键类型的消息
     */
    public class Consumer2 {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_direct";
        //设置消息的队列名称,例如发送邮件的队列
        private static final String QUEUE_NAME = "test_queue_direct_2";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
            //创建频道
            final Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");    //指定队列路由键
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); //指定队列路由键
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");   //指定队列路由键
    
            //保证队列一次只分发一个
            channel.basicQos(1);
    
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("****收到了一条消息:" + msg);
    
                    try {
                        //模拟业务耗时操作
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println(msg + ":处理完成");
                        //处理完成手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            //监听队列
            boolean autoAck = false; //自动应答关闭
            channel.basicConsume(QUEUE_NAME, autoAck,consumer);
        }
    }

    测试结果:

    Producer生产者发送 error类型路由键的消息:

    1555147092501

    消费者1和消费者2都能接收到消息:

    1555147120227

    生产者发送一条info类型路由键消息:

    1555147156227

    只有消费者2能接收到:

    1555147194993

    因为消费者2设置接收的路由键类型是包含info的:

    1555147234314

    而消费者1只有error:

    1555147253037

3.7 主题模式

  • 模型

    Topic exchange :将路由键和某模式匹配(根据规则匹配查找对应的队列)

    # 匹配一个或多个

    * 匹配一个

    例如 发送 goods.add.one ,goods.# 能匹配 goods.* 不能匹配,但是发送 goods.add 都能匹配;

    相当于正则表达式匹配了;

    1555206609199

1555206615768

  • 代码示例

    生产者:

    package com.example.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * 主题模式生产者:例如发布一个商品信息消息
     */
    public class Producer {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取一个连接
            Connection connection = ConnectionUtils.getConnection();
    
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
    
            //声明交换机,设置为主题模式
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            //发送消息
            String msg = "Hello Topic !";
    
            //发布主题消息
            String type = "goods.delete";
            channel.basicPublish(EXCHANGE_NAME, type, null, msg.getBytes());
    
            System.out.println("****发送了一条消息:" + msg + "  ;类型:" + type);
    
            //关闭资源连接
            channel.close();
            connection.close();
        }
    
    }

    消费者1:

    package com.example.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 主题模式消费者1:产生一个队列,绑定到交换机,根据主题匹配规则接受消息
     */
    public class Consumer1 {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_topic";
        //设置消息的队列名称
        private static final String QUEUE_NAME = "test_queue_topic_1";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
            //创建频道
            final Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); //规则定义为接收goods. 所有类型消息
    
            //保证队列一次只分发一个
            channel.basicQos(1);
    
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("****收到了一条消息:" + msg);
    
                    try {
                        //模拟业务耗时操作
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println(msg + ":处理完成");
                        //处理完成手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            //监听队列
            boolean autoAck = false; //自动应答关闭
            channel.basicConsume(QUEUE_NAME, autoAck,consumer);
        }
    }

    消费者2:

    package com.example.topic;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 主题模式消费者2:产生一个队列,绑定到交换机,根据主题匹配规则接受消息
     */
    public class Consumer2 {
        //定义交换机名称
        private static final String EXCHANGE_NAME = "test_exchange_topic";
        //设置消息的队列名称
        private static final String QUEUE_NAME = "test_queue_topic_2";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
            //创建频道
            final Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); //只接收类型为goods.add的消息
    
            //保证队列一次只分发一个
            channel.basicQos(1);
    
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("****收到了一条消息:" + msg);
    
                    try {
                        //模拟业务耗时操作
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println(msg + ":处理完成");
                        //处理完成手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            //监听队列
            boolean autoAck = false; //自动应答关闭
            channel.basicConsume(QUEUE_NAME, autoAck,consumer);
        }
    }

    生产者生产一条 goods.delete 的消息:

    1555206834343

    消费者1消费了消息:

    1555206876544

    因为消费者1设置的模式能够匹配发送的消息格式:

    1555206911742

    生产者生产一条goods.add的消息:

    1555206956355

    消费1和消费者2都消费到了消息:

    1555206985716

    消费者1的goods.#能够匹配上述两条消息,消费2的goods.add只能消费goods.add消息,所以能接收到第二条消息;

4. 消息确认机制(事务+confirm)

在rabbitmq中我们可以通过持久化数据解决rabbitmq服务器异常导致数据丢失问题;

问题:生产者将消息发送出去之后,消息到底有没有成功的到达rabbitmq服务器,默认情况下是不知道的; 解决方案:

两者方式:

I. AMQP 实现了事务机制

II. Confirm模式

4.1 事务机制

  • 基本操作

    txSelect : 用户将当前channel设置成transaction模式;

    txCommit : 用于提交事务;

    txRollback : 回滚事务;

  • 代码示例

    生产者:

    package com.example.tx;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     *  事务管理
     */
    public class Producer {
        //定义队列名称
        private static final String QUEUE_NAME = "test_queue_transaction";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取一个连接
            Connection connection = ConnectionUtils.getConnection();
    
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
            //创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            try{
                //开启事务
                channel.txSelect();
    
                //定义要发送的消息
                String msg = "Hello Transaction!";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("----发送了一条消息:" + msg);
    
                //提交事务
                channel.txCommit();
            }catch (Exception e){
                //事务回滚
                channel.txRollback();
                System.out.println("产生异常,消息未成功发送!");
            }
    
            //关闭资源连接
            channel.close();
            connection.close();
        }
    }

    消费者:

    package com.example.tx;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.example.simple.ConnectionUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     *  简单队列消费者
     */
    public class Consumer {
        //获取消息的队列名称
        private static final String QUEUE_NAME = "test_queue_transaction";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
    
            //创建频道
            Channel channel = connection.createChannel();
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body);
                    System.out.println("****收到了一条消息:" + msg);
                }
            };
    
            //监听队列
            channel.basicConsume(QUEUE_NAME, consumer);
        }
    
    }

    我们先开启消费者等待消费,然后生产者发送消息,正常情况下生产者发送消息,消费者接收到消息;

    但是期间产生了异常,不过我们用事务进行处理,保证消息并未发送出去:

    1555223616128

空文件

简介

暂无描述 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/xun963/RabbitMQ.git
git@gitee.com:xun963/RabbitMQ.git
xun963
RabbitMQ
RabbitMQ
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891