1 Star 3 Fork 1

Paul / rabbitmq-python

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

[TOC]

1-1 消息队列概述

1. 队列

从v image-20210325234457786


2. 消息队列

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,JSON,也可以很复杂,比如内嵌对象。

消息队列(Messaeg Queue)是一种使用队列(Queue)作为底层存储数据结构,可用于解决不同进程与应用之间通讯的分布式消息容器,也称为消息中间件。

从本质上说消息队列就是一个队列结构的中间件,也就是说消息放入这个中间件之后就可以直接返回,并不需要系统立即处理,而另外会有一个程序读取这些数据,并按顺序进行逐次处理。

目前使用得比较多的消息队列有ActiveMQRabbitMQKafkaRocketMQ等。

image-20210325234637207


3. 应用场景

消息队列常用的有五个场景:

  • 消息通讯
  • 异步处理
  • 服务解耦
  • 流量削峰

A. 消息通讯

消息队列最主要功能收发消息,其内部有高效的通讯机制,因此非常适合用于消息通讯。

可以基于消息队列开发点对点聊天系统,也可以开发广播系统,用于将消息广播给大量接收者。

image-20210325235320138


B. 异步处理

一般写的程序都是顺序执行(同步执行),比如一个用户注册函数,其执行顺序如下:

  • 写入用户注册数据
  • 发送注册邮件
  • 发送注册成功的短信通知
  • 更新统计数据

image-20210325235422684

按照上面的执行顺序,要全部执行完毕,才能返回成功,但其实在第1步执行成功后,其他的步骤完全可以异步执行,可以将后面的逻辑发给消息队列,再由其他程序异步执行,如下所示:

image-20210325235445762

使用消息队列进行异步处理,可以更快地返回结果,加快服务器的响应速度,提升了服务器的性能。


C. 服务解耦

在系统中,应用与应用之间的通讯是很常见的,一般应用之间直接调用,比如说应用A调用应用B的接口,这时候应用之间的关系是强耦合的。

如果应用B处于不可用的状态,那么应用A也会受影响。

在应用A与应用B之间引入消息队列进行服务解耦,如果应用B挂掉,也不会影响应用A的使用。

使用消息队列之后,生产者并不关心消费者是谁,消费者同样不关注发送者是谁,这就是解耦,消息队列常用来解决服务之间的调用依赖关系。


D. 流量削峰

对于高并发的系统来说,在访问高峰时,突发的流量就像洪水般向应用系统涌过来,尤其是一些高并发写操作,随时会导致数据库服务器瘫痪,无法继续提供服务。

而引入消息队列则可以减少突发流量对应用系统的冲击。消息队列就像“水库”一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。

这方面最常见的例子就是秒杀系统,一般秒杀活动瞬间流量很高,如果流量全部涌向秒杀系统,会压垮秒杀系统,通过引入消息队列,可以有效缓冲突发流量,达到“削峰填谷”的作用。


1-2. RabbitMQ概述

1. 概述

RabbitMQ是用Erlang语言开发的一个实现了AMQP协议的消息队列服务器,相比其他同类型的消息队列,最大的特点在保证可观的单机吞吐量的同时,延时方面非常出色。

RabbitMQ支持多种客户端,比如:PythonRuby.NETJavaJMSCPHPActionScriptXMPPSTOMP等。

RabbitMQ最初起源于进入系统,用于在分布式系统中存储转发消息。


RabbitMQ的特点:

  • 可靠性:RabbitMQ提供了多种技术可以在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制
  • 灵活的路由:消息在到达队列前是通过交换器进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换器类型
  • 集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用
  • 联合:对于服务器来说,它比集群需要更多的松散和非可靠链接。为此RabbitMQ提供了联合模型
  • 高可用队列:在同一个集群里,队列可以被镜像到多个机器中,以确保当其中某些节点出现故障后仍然可用
  • 多协议:支持多种消息协议的消息传递,比如AMQP、STOMP、MQTT等
  • 广泛的客户端:RabbitMQ 几乎支持所有常用语言,比如Java、.NET、Ruby、PHP、C#、JavaScript 等
  • 可视化管理工具:RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控和管理消息、集群中的节点
  • 追踪:RabbitMQ提供了对异常行为的追踪的支持,能够发现问题所在
  • 插件系统:RabbitMQ附带了各种各样的插件来进行扩展,甚至可以写插件来使用

2. AMQP协议

RabbitMQ是一个实现了AMQP协议的消息队列服务器,这里先来介绍以下AMQP。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。


消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。

由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理可以存在于不同的设备上。


AMQP 0-9-1的工作过程如下图:

消息(message)被发布者(publisher)发送给交换器(exchange),交换器常常被比喻成邮局或者邮箱。然后交换器将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

image-20210327002338523

发布者(publisher)发布消息时可以给消息指定各种消息属性(message meta-data)。有些属性有可能会被消息代理(brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。

基于此原因,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当一个消息从队列中投递给消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。

在某些情况下,例如当一个消息无法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

队列,交换器和绑定统称为AMQP实体(AMQP entities)。

AMQP协议的设计模型如下:

image-20210620205524132


3. 相关概念

RabbitMQ有属于自己的一套核心概念,对这些概念的理解很重要,只有理解了这些核心概念,才有可能建立对RabbitMQ的全面理解。


A. 生产者,Producer

生产者连接到RabbitMQ服务器,然后将消息发送到RabbitMQ服务器的队列,是消息的发送方。

消息一般可以包含2个部分:

  • 消息体:消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,可以很简单,比如文本字符串、json等,也可以是内嵌对象等。
  • 标签(Label)

生产者不需要知道消费者是谁。

另外,生产者不是直接将消息发送到队列的,而是将消息发送到交换器的,再由交换器转发到队列去。


B. 消费者,Consumer

消费者连接到RabbitMQ服务器,并订阅到队列上,是消息的接收方。

当消费者消费一条消息时,只是消费消息的消息体(payload)。

在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。


C. Broker

Broker,是消息中间件的服务节点。

对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者 RabbitMQ服务实例 。


D. Queue,队列

队列,Queue,是RabbitMQ内部用于存储消息的对象,是真正用存储消息的结构。

在生产端,生产者的消息最终发送到指定队列,而消费者也是通过订阅某个队列,达到获取消息的目的。

RabbitMQ中消息都只能存储在队列中,队列的特性是先进先出。

队列收到的消息,必须是由Exchange转发过来的。

一个队列中的消息数据可能来自于多个Exchange,一个Exchange中的消息数据也可能推送给多个队列,它们之间的关系是多对多的。

多个消费者可以订阅同一个Queue,此时Queue中的消息会被平均分摊(即轮询)给多个消费者进行处理,而不是每个消息者都收到所有的消息并处理。

如图,红色的表示队列

image-20210405235412757


E. Exchange

Exchange,消息交换器,作用是接收来自生产者的消息,并根据路由键转发消息到所绑定的队列。

生产者发送的消息,每个消息都会带有一个路由键(RoutingKey),就是一个简单的字符串,消息先通过Exchange按照绑定(binding)规则转发到队列的。

一个队列中的消息数据可能来自于多个Exchange,一个Exchange中的消息数据也可能推送给多个队列,它们之间的关系是多对多的。

交换器拿到一个消息之后将它路由给一个或多个队列。它使用哪种路由算法是由交换器类型和被称作绑定(bindings)的规则所决定的。

如图:

image-20210523170719017

交换器类型(Exchange Type)共有4种:

  • fanout:扇型交换器,这种类型不处理路由键(RoutingKey),类似于广播,把所有发送到交换器上的消息都会发送给与该交换器绑定的所有队列上。该类型下发送消息是最快的。
  • direct:直连交换器,模式处理路由键(RoutingKey),需要路由键(RoutingKey)和BindingKey完全匹配的队列才能收到交换器的消息。这种模式使用最多。
  • topic:主题交换器,将路由键和某模式进行匹配。
  • headers:头交换器,一般很少用。

注意:交换器(Exchange)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列于交换器绑定,或者是没有符合路由规则的队列,那么消息会丢失。


F. Binding

Binding是一种操作,其作用是建立消息从Exchange转发到Queue的规则,在进行ExchangeQueue的绑定时,需要指定一个路由键BindingKey,Binding操作一般用于RabbitMQ的路由工作模式和主题工作模式。

如下图:

image-20210524115740462


G. vhosts

vhosts,虚拟主机(Virtual Host),Virutal host也叫虚拟主机,一个Virtual Host下面有一组不同ExchnageQueue,不同的Virtual hostExchnageQueue之间互相不影响。

每个vhosts本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。

应用隔离与权限划分,Virtual host是RabbitMQ中最小颗粒的权限单位划分。

如果要类比的话,可以把Virtual host比作MySQL中的数据库,通常我们在使用MySQL时,会为不同的项目指定不同的数据库,同样的,在使用RabbitMQ时,可以为不同的应用程序指定不同的Virtual host

RabbitMQ 当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止 A 组访问 B 组的交换器 / 队列 / 绑定,必须为 A 和 B 分别创建一个虚拟主机。

每一个 RabbitMQ 服务器都有一个默认的虚拟主机 “/” 。

一个RabbitMQ的Server上可以有多个vhosts,用户与权限设置就是依附于vhosts。

在同一个vhosts下的exchange和queue才能相互绑定。

建议:一般有多个项目需要使用RabbitMQ的时候,不需要每个项目都去部署RabbitMQ,这样非常浪费资源,只需要每个项目对应一个vhost即可,vhost 之间是绝对隔离的,不同的 vhost 对应不同的项目,互不影响。


H. Connection

ConnectionRabbitMQ内部对象之一,偏物理的概念,是一个TCP连接,用于管理每个到RabbitMQTCP网络连接。

生产者、消费者和Broker之间就是通过Connection进行连接的。


I. Channel

Channel,信道,是与RabbitMQ打交道的最重要的一个接口,是偏逻辑上的概念,在一个连接(Connection)中可以创建多个Channel。

大部分与RabbitMQ的相关操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定QueueExchange、发布消息等。

一旦连接(Connection)建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。

image-20210526004755638

为什么要引入Channel呢?

在某个场景下一个应用程序中有很多个线程需要从RabbitMQ中消费消息,或者生产消息,那么必然需要建立很多个Connection,也就是许多个TCP连接。然而对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。

RabbitMQ采用类似NIO(Non-blocking I/O)的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。

每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。

但是当信道本身的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection,将这些信道均摊到这些Connection中。


4. 运转流程

可以参考如图:

image-20210331235126776

消息的运转过程如下图:

image-20210523165933865

生产者发送消息:

  1. 生产者连接到Broker,建立一个连接(Connection),开启一个信道(Channel)
  2. 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
  3. 生产者声明一个队列并设置相关属性,比如是否排他性、是否持久化、是否自动删除等
  4. 生产者通过路由键将交换器和队列绑定起来
  5. 生产者发送消息到Broker,其中包含路由键、交换器等信息
  6. 相应的交换器根据接收到的路由键查找相匹配的队列
  7. 如果找到则将从生产者发送来的消息存入对应的队列中
  8. 如果没找到则根据生产者配置的属性选择丢弃还是回退给生产者
  9. 关闭信道
  10. 关闭连接

消费者接收消息:

  1. 消费者连接到Broker,建立一个连接(Connection),开启一个信道(Channel)
  2. 消费者向Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
  3. 等待Broker回应并投递相应队列中的消息,消费者接收消息
  4. 消费者确认(ack)接收到的消息
  5. RabbitMQ从队列中删除相应已经被确认的消息
  6. 关闭信道
  7. 关闭连接

架构设计如下:

image-20210620205616500


2-1. 安装

这里只介绍Windows的安装和CentOS7下的安装,官网还提供了Ubuntu、Mac甚至docker下的安装等。


RabbitMQ服务器的代码是使用erlang语言编写的,所以是需要先安装erlang语言的。

注意:RabbitMQ的版本依赖于Erlang的版本,两者之间是有版本的兼容性要求的,一定要选择兼容的版本,具体可参考:https://www.rabbitmq.com/which-erlang.html


1. Windows安装

A. 安装erlang

先通过官网下载erlang:官网下载地址

下载exe文件然后安装


安装好了之后需要设置环境变量:

我的电脑 - 右击属性 - 高级系统设置 - 环境变量 - 用户变量/系统变量新建一个变量:

image-20210307001248266

变量名为:ERLANG_HOME,变量值为erlang的安装目录

还需要加入到Path中:%ERLANG_HOME%\bin


然后打开命令行,输入erl,如果显示erlang的版本信息即表示安装成功:

image-20210307001522360


B. 安装RabbitMQ

通过官网下载RabbitMQ:官网下载地址

双击安装即可


C. 安装Web管理【非必须】

该步骤非必须,RabbitMQ还提供了Web管理工具,而Web管理工具作为RabbitMQ的插件,相当于是一个后台管理页面,方便在浏览器中查看

进入到sbin目录下,打开命令行输入:

./rabbitmq-plugins.bat enable rabbitmq_management

安装成功之后,浏览器输入http://localhost:15672即可访问管理页面

image-20210307001918539

默认的账号和密码都是guest

注意:一般会创建个新的管理员用户,不使用默认的guest,guest用户只能在localhost下访问,如果在内网的其他机器访问的话,登录的时候会报错:

image-20210326235458012

Web管理页面的使用操作可见 6-1


2. Centos 7安装

A. 安装erlang

关于安装erlang,RabbitMQ官网提供了4种方式安装:

image-20210326232421547

我这里使用第一种方式,按照Github上的安装方式

新建一个文件:/etc/yum.repos.d/rabbitmq_erlang.repo

# vim /etc/yum.repos.d/rabbitmq_erlang.repo
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
       https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
       https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

保存之后,进行yum安装

yum install erlang

然后打开命令行,输入erl,如果显示erlang的版本信息即表示安装成功:

image-20210326232714498

注意:不能直接yum install erlang,会导致erlang的版本很低,后面安装rabbitMQ的时候会有版本冲突的


B. 安装其他依赖

除了erlang,RabbitMQ还需要安装:socat和logrotate

yum install -y socat logrotate

C. 安装RabbitMQ

这里有两种方式进行安装:

  • rpm安装
  • yum安装

第一种方式:rpm安装

下载RabbitMQ,具体的rpm包链接可参考:官网下载页面

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.14/rabbitmq-server-3.8.14-1.el7.noarch.rpm

然后rpm安装:

rpm -ivh rabbitmq-server-3.8.14-1.el7.noarch.rpm

第二种方式:yum安装

新建一个文件:/etc/yum.repos.d/rabbitmq_server.repo

# vim /etc/yum.repos.d/rabbitmq_server.repo
[rabbitmq_server]
name=rabbitmq_server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_server-source]
name=rabbitmq_server-source
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

然后yum安装:

yum install -y rabbitmq_server rabbitmq-server

通过rabbitmqctl来验证是否安装成功:

>>> rabbitmqctl version
3.8.14

D. 启动

以守护进程的方式来启动RabbitMQ:

# -detached为可选参数,表示后台开启
rabbitmq-server -detached

可以通过查看状态来验证是否启动:

rabbitmqctl status

如果要关闭的话,可以使用:

rabbitmqctl stop

E. 安装Web管理【非必须】

该步骤非必须,RabbitMQ还提供了Web管理工具,而Web管理工具作为RabbitMQ的插件,相当于是一个后台管理页面,方便在浏览器中查看

rabbitmq-plugins enable rabbitmq_management 

安装完成之后即可通过localhost:15672来进行访问

image-20210307001918539

默认的账号和密码都是guest

注意:一般会创建个新的管理员用户,不使用默认的guest,guest用户只能在本地(localhost)下访问,如果在内网的其他机器访问的话,登录的时候会报错:

image-20210326235458012

以下是添加管理员用户(这里是root):

# 添加用户
rabbitmqctl add_user root 123456

# 赋予管理员权限
rabbitmqctl set_user_tags root administrator

# 设置所有权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

# 查看用户列表
rabbitmqctl list_users

2-2 RabbitMQ常见命令

以下是RabbitMQ的场景命令:

# 查看版本
rabbitmqctl version

# 查看状态
rabbitmqctl status

# 停止
rabbitmqctl stop

# 添加用户
rabbitmqctl add_user root 123456

# 赋予管理员权限
rabbitmqctl set_user_tags root administrator

# 设置所有权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

# 查看用户列表
rabbitmqctl list_users

# 查看所有的交换器
rabbitmqctl list_exchanges

# 查看所有的队列
rabbitmqctl list_queues

# 查看未确认的消息
rabbitmqctl list_queues name messages_ready messages_unacknowledged

# 查看所有的绑定
rabbitmqctl list_bindings

3-1. 交换器类型

交换器类型(Exchange Type)共有4种:

  • fanout:扇型交换器,这种类型不处理路由键(RoutingKey),类似于广播,把所有发送到交换器上的消息都会发送给与该交换器绑定的所有队列上。该类型下发送消息是最快的。
  • direct:直连交换器,模式处理路由键(RoutingKey),需要路由键完全匹配的队列才能收到交换器的消息。这种模式使用最多。
  • topic:主题交换器,将路由键和某模式进行匹配。
  • headers:头交换器,一般很少用。

1. Fanout

扇型交换器(fanout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键,也就是说在Fanout模式下是不需要RoutingKey的

如果N个队列绑定到某个扇型交换器上,当有消息发送给此扇型交换器时,交换器会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换器处理消息的广播路由(broadcast routing)。

因为扇型交换器投递消息的拷贝到所有绑定到它的队列,所以应用场景都极其相似:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
  • 分发系统使用它来广播各种状态和配置更新
  • 在群聊的时候,它被用来分发消息给参与群聊的用户

扇型交换器图例:

image-20210405215043450


2. Direct

直连型交换器(direct exchange)是根据消息携带的路由键(RoutingKey)将消息投递给对应队列的。直连交换器用来处理消息的单播路由(unicast routing)。下边介绍它是如何工作的:

  • 将一个队列绑定到某个交换器上,同时赋予该绑定一个路由键(RoutingKey)
  • 当一个携带着路由键为R的消息被发送给直连交换器时,交换器会把它路由给绑定值同样为R的队列。

直连交换器经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

直连交换器是完全匹配、单播的模式,同时也是RabbitMQ默认的交换器模式,也是最简单的模式。

直连型交换器图例:

image-20210405215542280


3. Topic

主题交换器(topic exchanges)通过对消息的路由键和队列到交换器的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换器经常用来实现各种分发/订阅模式及其变种。主题交换器通常用来实现消息的多播路由(multicast routing)。

主题交换器拥有非常广泛的使用场景。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换器都可以被列入考虑范围。

使用场景:

  • 分发有关于特定地理位置的数据,例如销售点
  • 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
  • 股票价格更新(以及其他类型的金融数据更新)
  • 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
  • 云端的不同种类服务的协调
  • 分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。

主题交换器在流程上和直连模式类似,但比直连模式更优化的地方在于支持了RoutingKey的通配符,通配符也并不是按照正则表达式的那种方式,只是简单支持了*和#,并且RoutingKey有严格的规划,单词之间必须用星号符(.)隔开

  • * 代表一个单词
  • # 代表零个或多个单词

如图:

image-20210405220255426

  • 路由键为“com.orange.rabbit”的消息会同时路由到Q1和Q2
  • 路由键为“lazy.orange.rabbit”的消息会同时路由到Q1和Q2
  • 路由键为“com.hidden.rabbit”的消息只会路由到Q2
  • 路由键为“com.orange.demo”的消息只会路由到Q1
  • 路由键为“java.hidden.rabbit”的消息只会路由到Q2
  • 路由键为“java.hidden.demo”的消息因为没有匹配任何路由键将会被丢弃或者是返回给生产者(需要mandatory参数)

4. Headers

有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头headers 交换器就是为此而生的。头交换器使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

我们可以绑定一个队列到头交换器上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。

头交换器可以视为直连交换器的另一种表现形式。头交换器能够像直连交换器一样工作,不同之处在于头交换器的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。


Headers交换器提供了另一种不同于主题交换器的策略,在发送消息的时候,给消息设置header,header是一系列的键值对,可以设置多个,配置绑定关系有两种选择:

  • match-any:header中的任意一个键值对能够匹配上就会被路由到对应的Queue
  • match-all:header中的所有键值对能够匹配上才会被路由到对应的Queue

注意:Headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。


5. 总结

从性能的角度来说,Fanout > Direct > Topic > Headers

在实际应用中,在满足工作场景的前提下,选择性能最高的那种模式,一般会使用Direct类型


3-2. 消息确认

消费者在消费消息的时候,可能是几秒钟,也可能是耗时很长比如几十分钟,这时候有可能消费者执行到了一半结果崩溃了或者是连接断开了或者是该消费者被人工kill掉等,那么该消息就有可能会丢失。

为了保证消息能够从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制。

消息确认机制可分为两种:

  • 消费者确认
  • 生产者确认

如图,RabbitMQ的消息确认流程:

image-20210620234309975


消费者确认

消费者确认机制:

  • 消费者在处理完消息之后,发回一个 ack 来告诉 RabbitMQ 消息已经被接收、处理,此时 RabbitMQ 才去可以删除它
  • 消费者在处理消息的时候如果处理失败,也可以发回一个ack来告诉RabbitMQ拒绝接收,让RabbitMQ重新发送消息
  • 如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理并将重新排队。如果有其他消费者同时在线,RabbitMQ会将消息重新分配给另一个消费者,这样就可以确保不会丢失任何消息

消费者订阅队列的时候,可以指定autoAck参数:

  • 当autoAck为true的时候,RabbitMQ采用自动确认模式,RabbitMQ自动把发送出去的消息设置为确认,然后从内存或者硬盘中删除,而不管消费者是否真正消费到了这些消息。
  • 当autoAck为false的时候,RabbitMQ会等待消费者回复的确认信号,收到确认信号之后才从内存或者磁盘中删除消息。

默认情况下autoAck为false,即不自动确认,以Python为示例:

# 定义队列的消费回调,将消息传递给回调函数,消费完成手动进行消息确认
channel.basic_consume(queue=队列名, on_message_callback=callback, auto_ack=False)

消息确认机制是RabbitMQ消息可靠性投递的基础,只要设置autoAck参数为false,消费者就有足够的时间处理消息,不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。


注意:在实际应用中很容易忘记消息确认,这会导致堆积越来越多的未确认的消息,这种消息无法自动释放,可以通过以下命令来查看未确认的消息:

rabbitmqctl list_queues name messages_ready messages_unacknowledged

注意:实际项目中是会关闭自动确认的,但无论如何消费者必须发送ack响应,否则会导致堆积的未确认消息越来越多。


生产者确认

同样的,生产者发送消息到Broker的时候,如果消息由于网络原因无法达到,而生产者也不知道消息到底有没有到Broker,这有可能会造成问题,比如重复消费的问题等。

所以RabbitMQ同样的提供了生产者确认机制:

  • 消息到达Exchange:Exchange向生产者发送Confirm确认,成功或失败都会返回一个confirmCallback
  • 消息成功达到Exchange,但是从Exchange投递Queue失败:向生产者返回一个returnCallback。只有失败才会返回

3-3. 持久化

虽然可以通过消息确认机制来避免消费者一旦死亡而导致消息丢失,但还是存在消息丢失的可能性。

当RabbitMQ崩溃挂掉的时候,交换器和队列是会全部丢失:

  • 交换器丢失,消息不会丢失,但不能将消息发送到该交换器
  • 队列丢失,队列中的消息会丢失

所以为了保证消息不丢失,都会在建立交换器和队列的时候声明持久化存储,持久化之后即使RabbitMQ崩溃挂掉,那么在重启之后交换器和队列依然还是存在的不会丢失。

RabbitMQ默认都是不开启持久化的,默认建立的是临时交换器和队列。


交换器的持久化是通过durable=True来实现的:

# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable=True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange=交换器名, durable=True)

如果不设置交换器的持久化,那么在RabbitMQ服务重启之后,相关的交换器元数据会丢失,但消息不会丢失,只是不能将消息发送到该交换器中。

对一个长期使用的交换器来说,建议将其设置为持久化。


队列的持久化是通过durable=True来实现的:

# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable=True 代表消息队列持久化存储,False 非持久化存储
channel.queue_declare(queue=队列名, durable=True)

注意:如果已存在一个非持久化的队列或交换器 ,执行上述代码会报错,因为RabbitMQ不允许使用不同的参数重新声明一个队列或交换器,需要删除重建。另外如果队列和交换器中一个声明了持久化,另一个没有声明持久化,则不允许绑定。

如果队列不设置为持久化,那么在RabbitMQ服务重启之后,相关的队列元数据会丢失,此时数据也会丢失,即队列中的消息也会丢失的。


队列的持久化能保证其本身的元数据不因异常情况而丢失,但并不能保证内部所存储的消息不会丢失。要想保证消息不会丢失,需要将消息也设置持久化。

消息的持久化是通过在BasicProperties中设置deliveryMode设置为2来实现的:

# 向队列插入消息,delivery_mode=2:消息持久化,delivery_mode=1:消息非持久化
channel.basic_publish(exchange=交换器名, routing_key=路由键, body = message, properties=pika.BasicProperties(delivery_mode=2))

设置了队列和消息的持久化之后,当RabbitMQ服务重启之后,消息依旧存在。

只设置队列持久化,重启之后队列里面的消息会丢失。

只设置消息的持久化,重启之后队列消失,既而消息也丢失,设置消息持久化而不设置队列的持久化显得毫无意义。


注意:如果将所有的消息都进行持久化操作会严重影响RabbitMQ的性能,因为写入磁盘的速度比写入内存的速度慢很多,对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。鱼和熊掌不可兼得,关键在于选择和取舍。在实际中,需要根据实际情况在可靠性和吞吐量之间做一个权衡。


注意:将队列、交换器和消息都设置了持久化之后也并不能保证100%的消息不会丢失

  • 首先,如果开启了自动消息确认,那么消息传递给消费者的时候就已经从队列中删除,而消费者自身崩溃挂掉的话会导致该消息丢失,这可以通过手动消息确认机制来解决
  • 其次,消息在正确存入RabbitMQ之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。可引入RabbitMQ镜像队列机制来解决,具体可参考:《RabbitMQ实战指南》4.7章持久化

总结:

  • RabbitMQ默认都是不开启持久化的
  • 交换器不持久化,重启之后交换器丢失,消息不会丢失,但不能将消息发送到该交换器
  • 队列不持久化,重启之后队列丢失,队列中的消息会丢失
  • 只设置消息持久化而队列不持久化的做法无意义
  • 队列、交换器和消息都设置了持久化之后也并不能保证100%的消息不会丢失
  • 一般会设置交换器和队列持久化,而消息是否持久化则根据实际场景来

3-4. 公平调度

当RabbitMQ拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需创建更多的消费者来消费处理消息即可。

但很多时候轮询的分发机制也不是那么优雅。默认情况下,如果有n个消费者,那么rabbitmq会将第m条消息分发给第m%n(取余的方式)个消费者,RabbitMQ不管消费者是否消费并已经确认(Basic.Ack)了消息。

试想一下,如果某些消费者任务繁重,来不及消费那么多消息,而某些其他消费者由于某些原因很快处理完了所分配到的消息,进而空闲,这样就会造成整体应用吞吐量的下降。

RabbitMQ是可以设置公平分配消息任务,不会给某个消费者同时分配多个消息处理任务,换句话说,RabbitMQ在处理和确认消息之前,不会向消费者发送新的消息,而是将消息分发给下一个不忙的消费者。

RabbitMQ是使用channel.basic_qos(num)来保证公平调度的,该方法允许限制信道上的消费者所能保持的最大未确认消息的数量。

# 设置消费者允许的最大未确认消息数量为1
channel.basic_qos(prefetch_count=1)

举例说明,在订阅消费队列之前,消费端程序调用了channel.basic_qos(3),之后定义了某个队列进行消费。RabbitMQ会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,当达到了所设定的上限,那么RabbitMQ将不会向这个消费者再发送任何消息,直到消费者确认了某条消息之后,RabbitMQ会将相应的计数减1,之后消费者可以继续接收消息,直到再次达到计数上线。这种机制可以类比于TCP/IP的滑动窗口。


注意:如果channel.basic_qos(num)的num设置为0则表示没有上限。


4-1. 开发语言支持

RabbitMQ支持多种客户端,比如:PythonRuby.NETJavaJMSCPHPActionScriptXMPPSTOMP等。

这里主要是以Python和PHP为例进行演示。


Python

注意:Python是使用3.7版本,并且需要安装好pika

pip install pika

PHP

注意:需要PHP:7.x+,并且安装php-amqplib,可通过composer进行安装

composer.json(项目根目录下)

{
    "require": {
        "php-amqplib/php-amqplib": ">=3.0"
    }
}

项目目录下进行composer install即可

或者是直接不需要上面的composer.json,直接进行命令:

composer require php-amqplib/php-amqplib

4-2. 工作模式

RabbitMQ一共有7种工作模式,具体可见:RabbitMQ Tutorials

  • 简单模式
  • 工作模式
  • 发布/订阅模式
  • 路由模式
  • 主题模式
  • RPC模式
  • 生产者确认模式

1. 简单模式

简单(simple)模式,是几种工作模式中最简单的一种模式了,如下图:

image-20210622011733574

有以下特点:

  • 只有一个生产者、一个消费者和一个队列
  • 生产者和消费者在发送和接收消息时,只需要指定队列名,而不需要指定发送到哪个交换器,RabbitMQ会自动使用vhosts的默认交换器,默认交换器的type为直连(direct)

应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人


生产者发送消息步骤:

  1. 连接RabbitMQ服务器,建立信道
  2. 创建队列,如果不创建队列的话,发送消息的时候队列不存在那么RabbitMQ会抛弃该消息,注意:重复创建队列并不会重复创建队列,建议生产端和消费端都需要创建队列
  3. 发送消息,注意:这里只是发送到交换器而已,并不会直接到队列,空交换器是默认交换器,这里只是测试所以用默认交换器
  4. 关闭连接

消费者接收消息步骤:

  1. 同发送消息,连接RabbitMQ服务器,建立信道
  2. 同发送消息,创建队列,如果不创建队列的话,发送消息的时候队列不存在那么RabbitMQ会抛弃该消息,注意:重复创建队列并不会重复创建队列,建议生产端和消费端都需要创建队列
  3. 定义回调函数,每接收到一条消息就会把消息传递给回调函数
  4. 定义队列的消费回调,将消息传递给回调函数,同时进行消息确认
  5. 循环消费/接收消息

Python

注意:Python是使用3.7版本,并且需要安装好pika

发送端,发送消息:1-simple-send.py

#!/usr/bin/env python
import pika


# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='hello')

# 发送消息,消息体为hello world,交换器为默认交换器(空交换器),路由键为hello
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")

# 关闭连接
connection.close()

接收端,接收消息:1-simple-receive.py

#!/usr/bin/env python
import pika, sys, os

def main():
    # 连接RabbitMQ服务器
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 创建队列
    channel.queue_declare(queue='hello')

    # 定义回调函数
    def callback(ch, method, properties, body):
        print(f' [x] Received {body}')

    # 定义队列的消费回调,将消息传递给回调函数同时进行消息确认
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    # 开始消费/接收消息,注意:这是一个死循环,相当于`while True`
    channel.start_consuming()

    
if __name__ == '__main__':
    try:
        main()
    # ctrl + c可以中断循环
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

运行:

先运行接收端:

python 1-simple-receive.py
# => [*] Waiting for messages. To exit press CTRL+C

再运行发送端:

python 1-simple-send.py
# => [x] Sent 'Hello World!'

此时接收端已经会接收到消息并且输出了:

# => [x] Received 'Hello World!'

PHP

注意:需要PHP:7.x+,并且安装php-amqplib

发送端,发送消息:1-simple-send.php

<?php
    
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = '';    // 默认交换器
$queue = 'hello';
$routing_key = 'hello';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建队列
$channel->queue_declare($queue, false, false, false, false);

// 定义消息,消息体为hello world
$msg = new AMQPMessage('Hello World!');

// 发送消息到交换器,交换器为默认交换器(空交换器),路由键为hello
$channel->basic_publish($msg, $exchange, $routing_key);

echo " [x] Sent 'Hello World!'\n";

// 关闭连接
$channel->close();
$connection->close();

接收端,接收消息:1-simple-receive.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$queue = 'task-queue';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建队列
$channel->queue_declare($queue, false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

// 定义回调函数
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

// 定义队列的消费回调,将消息传递给回调函数同时进行自动消息确认
$channel->basic_consume($queue, '', false, true, false, false, $callback);

// 循环消费/接收消息
while ($channel->is_open()) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();

2. 工作模式

在简单(simple)模式下只有一个消费者,当生产者生产消息的速度大于消费者消费的速度时,可能会造成消息的堆积,这时需要添加一个或多个消费者来加快消费速度,这种模式称之为工作(work)模式,如下图:

image-20210622011820672

特点:

  • 可以有多个消费者,但一条消息只能被一个消费者获取,不能重复消费
  • 发送到队列中的消息,由RabbitMQ平均分配给不同消费者进行消费
  • 手动消息确认
  • 持久化
  • 消息的公平调度

应用场景:一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理,比如一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况


步骤和简单模式下差不多,只不过是可以有多个消费者


Python

发送端,发送消息:2-work-send.py

# -*- encoding: utf-8 -*-
import pika
import sys

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列,并且开启持久化
channel.queue_declare(queue='task-queue', durable=True)

# 使用命令的参数作为消息体,如无则hello world
message = ' '.join(sys.argv[1:]) or "Hello World!"

# 发送消息,并且将消息设置为持久化
channel.basic_publish(exchange='',
                      routing_key='task-queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2
                      ))
print(" [x] Sent %r" % message)

# 关闭连接
connection.close()

注意:这里代码的队列名和简单模式里面的队列名是不一致的,这是因为执行简单模式下的代码之后已经创建了hello这个队列,是非持久化的,这里如果还是hello这个队列的话,那么执行的时候会报错,因为RabbitMQ不允许使用不同的参数重新声明一个队列或交换器,需要删除重建。

注意:这里代码的routing_key也需要改成和队列名一致,否则会丢失消息。


接收端,接收消息,2-work-receive.py

# -*- encoding: utf-8 -*-
import os
import pika
import time
import sys


def main():
    # 连接RabbitMQ服务器
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 创建队列,并且开启持久化
    channel.queue_declare(queue='task-queue', durable=True)

    # 定义回调函数
    def callback(ch, method, properties, body):
        print(f" [x] Received {body.decode()!r}")

        # 消息里面有几个.就sleep几秒
        time.sleep(body.count(b'.'))
        print(" [x] Done")

        # 消息确认
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # 设置消费者允许的最大未确认消息数量为1
    channel.basic_qos(prefetch_count=1)

    # 定义队列的消费回调,将消息传递给回调函数同时关闭自动消息确认
    channel.basic_consume(queue='task-queue', on_message_callback=callback)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 开始消费/接收消息,注意:这是一个死循环,相当于`while True`
    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    # ctrl + c可以中断循环
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

运行多个接收端,即有多个消费者同时进行同个队列的消费,默认情况下,RabbitMQ 将按顺序将每条消息平均发送给下一个消费者,每个消费者将获得相似数量的消息。这种分发消息的方式称为轮询。


运行:

先运行一个接收端:

python 2-work-receive.py
# => Waiting for messages. To exit press CTRL+C

再开一个控制台再运行一个同样的接收端

最后开一个控制台来多次运行发送端:

python 2-work-send.py First Message
# => [x] Sent 'First Message'
python 2-work-send.py Second Message
# => [x] Sent 'Second Message'
python 2-work-send.py Third Message
# => [x] Sent 'Third Message'
python 2-work-send.py Forth Message
# => [x] Sent 'Forth Message'
python 2-work-send.py Fifth Message
# => [x] Sent 'Fifth Message'

分别看两个接收端的输出:

# 第一个
python 2-work-receive.py 
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First Message'
# => [x] Done
# => [x] Received 'Third Message'
# => [x] Done
# => [x] Received 'Fifth Message'
# => [x] Done

# 第二个
python 2-work-receive.py 
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second Message'
# => [x] Done
# => [x] Received 'Forth Message'
# => [x] Done

可以看到消息是平均分配的


PHP

发送端,发送消息:2-work-send.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = '';    // 默认交换器
$queue = 'task-queue';
$routing_key = 'task-queue';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建队列,并且开启持久化
$channel->queue_declare($queue, false, true, false, false);

// 使用命令的参数作为消息体,如无则hello world
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = 'Hello World';
}

// 定义消息,并且将消息设置为持久化
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

// 发送消息到交换器,交换器为默认交换器(空交换器)
$channel->basic_publish($msg, $exchange, $routing_key);

echo " [x] Sent '" . $data . "\n";

// 关闭连接
$channel->close();
$connection->close();

接收端,接收消息:2-work-receive.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$queue = 'task-queue';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建队列,并且开启持久化
$channel->queue_declare($queue, false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

// 定义回调函数
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";

    // 消息里面有几个.就sleep几秒
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";

    // 消息确认
    $msg->ack();
};

// 设置消费者允许的最大未确认消息数量为1
$channel->basic_qos(null, 1, null);

// 定义队列的消费回调,将消息传递给回调函数,并且关闭自动消息确认
$channel->basic_consume($queue, '', false, false, false, false, $callback);

// 循环消费/接收消息
while ($channel->is_open()) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();

3. 发布/订阅模式

工作(work)模式可以将消息平均分配给多个消费者,但每条消息只能由一个消费者获取,如果想要一条消息同时被多个不同消费者消费的话可以用发布/订阅(pub/sub)模式,如下图:

image-20210622011401704

在发布/订阅模式下,需要指定发送到哪个交换器中,上面的X表示交换器

特点:

  • 发布/订阅模式下,交换器的type为fanout
  • 生产者在发送消息时不需要指定队列名,交换器会将收到的消息转发到所绑定的所有队列
  • 消息被交换器转发到多个队列时,一条消息可以同时被多个消费者获取

应用场景:更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:

  1. 一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列
  2. 一个缓存消息队列对应着多个缓存消费者
  3. 一个数据库消息队列对应着多个数据库消费者

Python

发送端,发送消息,3-pub-sub-send.py

# -*- encoding: utf-8 -*-
import pika
import sys

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建交换器,类型为扇形
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 使用命令的参数作为消息体,如无则hello world
message = ' '.join(sys.argv[1:]) or "info: Hello World!"

# 发送消息到交换器
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)

# 关闭连接
connection.close()

接收端,接收消息:3-pub-sub-receive.py

# -*- encoding: utf-8 -*-
"""
@Time    : 2021/6/12 23:42
@Author  : boli.hong
"""
import os
import pika
import sys


def main():
    # 连接RabbitMQ服务器
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 创建交换器,类型为扇形
    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    # 创建临时队列(临时队列:一旦没有消费者绑定该队列,该队列会自动删除)
    result = channel.queue_declare(queue='', exclusive=True)

    # 临时队列的队列名随机生成
    queue_name = result.method.queue

    # 绑定交换器和队列
    channel.queue_bind(exchange='logs', queue=queue_name)

    # 定义回调函数
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)

    # 定义队列的消费回调,将消息传递给回调函数同时自动消息确认
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')

    # 开始消费/接收消息,注意:这是一个死循环,相当于`while True`
    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    # ctrl + c可以中断循环
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

注意:

  • 这里使用了临时队列,这是因为发布/订阅下需要模拟多个队列来绑定交换器实现广播,所以使用了临时队列。
  • 临时队列是由系统自动生成的,队列名是随机的,一旦消费者连接关闭,队列会被删除掉,exclusive=True表示创建的是临时队列。

运行:

多个终端下来运行多个接收端:

python 3-pub-sub-receive.py
# [*] Waiting for messages. To exit press CTRL+C

运行一个发送端:

python 3-pub-sub-send.py
# [x] Sent 'info: Hello World!'

所有的接收端都会收到消息的:

 [x] b'info: Hello World!'

PHP

发送端,发送消息,3-pub-sub-send.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = 'boli-exchange';
$exchange_type = 'fanout';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建交换器,类型为扇形
$channel->exchange_declare($exchange, $exchange_type, false, false, false);

// 使用命令的参数作为消息体,如无则hello world
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = 'Hello World';
}

// 定义消息
$msg = new AMQPMessage($data);

// 发送消息到交换器
$channel->basic_publish($msg, $exchange);

echo " [x] Sent '" . $data . "\n";

// 关闭连接
$channel->close();
$connection->close();

接收端,接收消息:3-pub-sub-receive.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$exchange = 'boli-exchange';
$exchange_type = 'fanout';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建交换器,类型为扇形
$channel->exchange_declare($exchange, $exchange_type, false, false, false);

// 创建临时队列,队列名是随机的(临时队列:一旦没有消费者绑定该队列,该队列会自动删除)
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

// 绑定交换器和队列
$channel->queue_bind($queue_name, $exchange);

echo " [*] Waiting for $exchange. To exit press CTRL+C\n";

// 定义回调函数
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

// 定义队列的消费回调,将消息传递给回调函数,并且自动消息确认
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

// 循环消费/接收消息
while ($channel->is_open()) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();

4. 路由模式

前面的几种模式,生产者是无法指定到具体的目标队列的,而在路由(routing)模式下,消息的目标队列是可以由生产者来指定的,如下图:

image-20210622011109952

特点:

  • 路由模式下交换器的type为直连(direct)
  • 消息的目标队列可以由生产者按照RoutingKey规则来指定
  • 消费者通过BindingKey来绑定自己的队列
  • 一条消息队列可以被多个消费者获取
  • 只有RoutingKey和BindingKey相匹配的队列才会收到消息

RoutingKey用于生产者指定交换器最终将消息路由到哪个队列,而BindingKey用于消费者绑定到某个队列。

应用场景:如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息


Python

发送端,发送消息,4-routing-send.py

# -*- encoding: utf-8 -*-
import pika
import sys

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建交换器,类型为直连
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'info'

# 使用命令的参数作为消息体,如无则hello world
message = ' '.join(sys.argv[2:]) or "Hello World!"

# 发送消息到交换器
channel.basic_publish(exchange='direct_exchange', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))

# 关闭连接
connection.close()

接收端,接收消息,4-routing-receive.py

# -*- encoding: utf-8 -*-
import pika
import sys
import os


def main():
    # 连接RabbitMQ服务器
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 创建交换器,类型为直连
    channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')

    # 创建临时队列(临时队列:一旦没有消费者绑定该队列,该队列会自动删除)
    result = channel.queue_declare(queue='', exclusive=True)

    # 临时队列的队列名随机生成
    queue_name = result.method.queue

    routing_keys = sys.argv[1:]
    if not routing_keys:
        sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
        sys.exit(1)

    # 绑定交换器和队列
    for routing_key in routing_keys:
        channel.queue_bind(exchange='direct_exchange', queue=queue_name, routing_key=routing_key)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    # 定义回调函数
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))

    # 定义队列的消费回调,将消息传递给回调函数同时自动消息确认
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

    # 开始消费/接收消息,注意:这是一个死循环,相当于`while True`
    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    # ctrl + c可以中断循环
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

PHP

发送端,发送消息,4-routing-send.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = RABBITMQ_CONF['host'];
$port = RABBITMQ_CONF['port'];
$user = RABBITMQ_CONF['user'];
$password = RABBITMQ_CONF['password'];
$exchange = 'direct-exchange';
$exchange_type = 'direct';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection($host, $port, $user, $password);
$channel = $connection->channel();

// 创建交换器,类型为直连
$channel->exchange_declare($exchange, $exchange_type, false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

// 使用命令的参数作为消息体,如无则hello world
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = 'Hello World';
}

// 定义消息
$msg = new AMQPMessage($data);

// 发送消息到交换器
$channel->basic_publish($msg, $exchange, $routing_key);

echo " [x] Sent '" . $data . "\n";

// 关闭连接
$channel->close();
$connection->close();

接收端,接收消息,4-routing-receive.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = RABBITMQ_CONF['host'];
$port = RABBITMQ_CONF['port'];
$user = RABBITMQ_CONF['user'];
$password = RABBITMQ_CONF['password'];
$exchange = 'direct-exchange';
$exchange_type = 'direct';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection($host, $port, $user, $password);
$channel = $connection->channel();

// 创建交换器,类型为直连
$channel->exchange_declare($exchange, $exchange_type, false, false, false);

// 创建临时队列,队列名是随机的
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

$routing_keys = array_slice($argv, 1);
if (empty($routing_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

// 绑定交换器和队列
foreach ($routing_key as $routing_keys) {
    $channel->queue_bind($queue_name, $exchange, $routing_key);
}

echo " [*] Waiting for $exchange. To exit press CTRL+C\n";

// 定义回调函数
$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

// 定义队列的消费回调,将消息传递给回调函数,并且自动消息确认
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

// 循环消费/接收消息
while ($channel->is_open()) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();

5. 主题模式

主题模式是在路由模式的基础上,根据主题(Topics)来将路由键和某模式进行匹配。

其中#表示匹配多个词,*表示匹配一个词,消费者可以通过某种模式的BindingKey来达到订阅某个主题消息的目的,如下:

image-20210526212437408

特点:

  • 主题模式下的交换器的type取值为topic
  • 一条消息可以被多个消费者同时获取

应用场景:同上,Iphone促销活动可以接收主题为Iphone的消息,如Iphone12、Iphone13等


Python

发送端,发送消息,5-topic-send.py

# -*- encoding: utf-8 -*-
import pika
import sys

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建交换器,类型为主题
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'info'

# 使用命令的参数作为消息体,如无则hello world
message = ' '.join(sys.argv[2:]) or "Hello World!"

# 发送消息到交换器
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))

# 关闭连接
connection.close()

接收端,接收消息,5-topic-send.py

# -*- encoding: utf-8 -*-
import pika
import sys
import os


def main():
    # 连接RabbitMQ服务器
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 创建交换器,类型为主题
    channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')

    # 创建临时队列(临时队列:一旦没有消费者绑定该队列,该队列会自动删除)
    result = channel.queue_declare(queue='', exclusive=True)

    # 临时队列的队列名随机生成
    queue_name = result.method.queue

    routing_keys = sys.argv[1:]
    if not routing_keys:
        sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
        sys.exit(1)

    # 绑定交换器和队列
    for routing_key in routing_keys:
        channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=routing_key)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    # 定义回调函数
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))

    # 定义队列的消费回调,将消息传递给回调函数同时自动消息确认
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

    # 开始消费/接收消息,注意:这是一个死循环,相当于`while True`
    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    # ctrl + c可以中断循环
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

PHP

发送端,发送消息,5-topic-send.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = RABBITMQ_CONF['host'];
$port = RABBITMQ_CONF['port'];
$user = RABBITMQ_CONF['user'];
$password = RABBITMQ_CONF['password'];
$exchange = 'topic-exchange';
$exchange_type = 'topic';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection($host, $port, $user, $password);
$channel = $connection->channel();

// 创建交换器,类型为主题
$channel->exchange_declare($exchange, $exchange_type, false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

// 使用命令的参数作为消息体,如无则hello world
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = 'Hello World';
}

// 定义消息
$msg = new AMQPMessage($data);

// 发送消息到交换器
$channel->basic_publish($msg, $exchange, $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

// 关闭连接
$channel->close();
$connection->close();

接收端,接收消息,5-topic-send.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = RABBITMQ_CONF['host'];
$port = RABBITMQ_CONF['port'];
$user = RABBITMQ_CONF['user'];
$password = RABBITMQ_CONF['password'];
$exchange = 'topic-exchange';
$exchange_type = 'topic';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection($host, $port, $user, $password);
$channel = $connection->channel();

// 创建交换器,类型为主题
$channel->exchange_declare($exchange, $exchange_type, false, false, false);

// 创建临时队列,队列名是随机的
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);

$routing_keys = array_slice($argv, 1);
if (empty($routing_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

// 绑定交换器和队列
foreach ($routing_keys as $routing_key) {
    $channel->queue_bind($queue_name, $exchange, $routing_key);
}

echo " [*] Waiting for $exchange. To exit press CTRL+C\n";

// 定义回调函数
$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

// 定义队列的消费回调,将消息传递给回调函数,并且自动消息确认
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

// 循环消费/接收消息
while ($channel->is_open()) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();

6. RPC模式

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC,具体流程可以看图。

image-20210622011206723

一般来说通过RabbitMQ来实现RPC是很容易的。一个客户端发送请求信息,服务器端将其应用到一个回复信息中。为了接收到回复信息,客户端需要在发送请求的时候同时发送一个回调队列(callback queue)的地址。


RabbitMQ中实现RPC的机制是:

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在RPC请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • RPC工作者(又名:服务器)等待请求发送到这个队列中来,当请求出现的时候,它执行它的工作并且将带有执行结果的消息发送给reply_to字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查correlation_id属性。如果此属性的值与请求匹配,将它返回给应用。

消息属性

AMQP协议给消息预定义了一系列的14个属性。大多数属性很少会用到,除了以下几个:

  • delivery_mode(投递模式):将消息标记为持久的(值为2)或暂存的(除了2之外的其他任何值)。第二篇教程里接触过这个属性,记得吧?
  • content_type(内容类型):用来描述编码的mime-type。例如在实际使用中常常使用application/json来描述JOSN编码类型。
  • reply_to(回复目标):通常用来命名回调队列。
  • correlation_id(关联标识):用来将RPC的响应和请求关联起来。

上边介绍中,我们建议给每一个RPC请求新建一个回调队列。这不是一个高效的做法,幸好这儿有一个更好的办法 —— 我们可以为每个客户端只建立一个独立的回调队列。

这就带来一个新问题,当此队列接收到一个响应的时候它无法辨别出这个响应是属于哪个请求的。correlation_id 就是为了解决这个问题而来的。我们给每个请求设置一个独一无二的值。稍后,当我们从回调队列中接收到一个消息的时候,我们就可以查看这条属性从而将响应和请求匹配起来。如果我们接手到的消息的correlation_id是未知的,那就直接销毁掉它,因为它不属于我们的任何一条请求。

为什么我们接收到未知消息的时候不抛出一个错误,而是要将它忽略掉?

这是为了解决服务器端有可能发生的竞争情况。尽管可能性不大,但RPC服务器还是有可能在已将应答发送给我们但还未将确认消息发送给请求的情况下死掉。如果这种情况发生,RPC在重启后会重新处理请求。这就是为什么我们必须在客户端优雅的处理重复响应,同时RPC也需要尽可能保持幂等性。


应用场景:需要等待接口返回数据,如订单支付


关于RPC的注意事项:

尽管RPC在计算领域是一个常用模式,但它也经常被诟病。当一个问题被抛出的时候,程序员往往意识不到这到底是由本地调用还是由较慢的RPC调用引起的。同样的困惑还来自于系统的不可预测性和给调试工作带来的不必要的复杂性。跟软件精简不同的是,滥用RPC会导致不可维护的面条代码.

考虑到这一点,牢记以下建议:

确保能够明确的搞清楚哪个函数是本地调用的,哪个函数是远程调用的。给你的系统编写文档。保持各个组件间的依赖明确。处理错误案例。明了客户端改如何处理RPC服务器的宕机和长时间无响应情况。

当对避免使用RPC有疑问的时候。如果可以的话,你应该尽量使用异步管道来代替RPC类的阻塞。结果被异步地推送到下一个计算场景。


Python

服务器端,6-rpc_server.py

# -*- encoding: utf-8 -*-
import pika
import sys

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='rpc_queue')


def fib(n):
    """
    斐波纳契数列
    注意:由于该方法是递归,这里是用来测试一些耗时任务
    """
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    """
    回调方法,执行实际的操作并做出响应
    """
    n = int(body)

    print(" [.] fib(%s)" % (n,))
    response = fib(n)

    # 发送消息,交换器为默认交换器(空交换器)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))

    # 消息确认
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 设置消费者允许的最大未确认消息数量为1
channel.basic_qos(prefetch_count=1)

# 定义队列的消费回调,将消息传递给回调函数同时关闭自动消息确认
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")

# 开始消费/接收消息,注意:这是一个死循环,相当于`while True`
channel.start_consuming()

服务器端的代码很简单:

  • 像往常一样建立连接,声明队列
  • 声明fibonacci函数,它假设只有合法的正整数当作输入
  • 为 basic_consume 声明了一个回调函数,这是RPC服务器端的核心。它执行实际的操作并且作出响应。
  • 如果希望能在服务器上多开几个线程,为了能将负载平均地分摊到多个服务器,需要将 prefetch_count 设置好。

客户端,6-rpc-client.py

# -*- encoding: utf-8 -*-
import pika
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        # 连接RabbitMQ服务器
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        # 创建临时队列(临时队列:一旦没有消费者绑定该队列,该队列会自动删除)
        result = self.channel.queue_declare(queue='', exclusive=True)

        # 临时队列的队列名随机生成
        self.callback_queue = result.method.queue

        # 定义队列的消费回调,将消息传递给回调函数同时自动消息确认
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 发送消息,消息带有reply_to和correlation_id属性
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n)
        )
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

客户端的代码比较复杂:

  • 建立连接、通道并且声明一个用于回调的独享的回调队列(callback_queue)。
  • 我们订阅这个回调队列(callback_queue),以便可以接收RPC的响应。
  • “on_response”回调函数对每一个响应会执行一个非常简单的操作,会检查每一个响应的消息的correlation_id属性是否与我们期待的一致,如果一致,将响应结果赋给self.response,然后跳出consuming循环。
  • 接下来,我们定义我们的主要方法 call 方法。它执行真正的RPC请求。
  • 在这个call方法中,首先我们生成一个唯一的 correlation_id 值并且保存起来,'on_response'回调函数会用它来获取符合要求的响应。
  • 同样在call方法中,我们还将带有 reply_to 和 correlation_id 属性的消息发布出去
  • 最后,等待正确的响应到来并将响应返回给用户。

运行:

启动服务器端:

python 6-rpc-server.py
 [x] Awaiting RPC requests

运行客户端,客户端会发起一个fibonacci请求

python 6-rpc-client.py
 [x] Requesting fib(30)
 [.] Got 832040

同时服务器端会响应:

 [.] fib(30)

注意:此处呈现的设计并不是实现RPC服务的唯一方式,但是他有一些重要的优势:

  • 如果RPC服务器运行的过慢的时候,你可以通过运行另外一个服务器端轻松扩展它。试试在控制台中运行第二个 rpc_server.php 。
  • 在客户端,RPC请求只发送或接收一条消息。不需要像 queue_declare 这样的异步调用,所以RPC客户端的单个请求只需要一个网络往返。

我们的代码非常简单,而且没有试图去解决一些复杂的问题,如:

  • 当没有服务器运行时,客户端如何作出反映。
  • 客户端是否需要实现类似RPC超时的东西。
  • 如果服务器发生故障,并且抛出异常,应该被转发到客户端吗?
  • 在处理前,防止混入无效的信息(例如检查边界)

PHP

服务器端,6-rpc-server.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = RABBITMQ_CONF['host'];
$port = RABBITMQ_CONF['port'];
$user = RABBITMQ_CONF['user'];
$password = RABBITMQ_CONF['password'];
$exchange = '';
$queue = 'rpc_queue';

// 连接RabbitMQ服务器
$connection = new AMQPStreamConnection($host, $port, $user, $password);
$channel = $connection->channel();

// 创建队列
$channel->queue_declare($queue, false, false, false, false);

/**
 * 斐波纳契数列
 * 注意:由于该方法是递归,这里是用来测试一些耗时任务
 */
function fib($n)
{
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return fib($n - 1) + fib($n - 2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
    $n = intval($req->body);
    echo ' [.] fib(', $n, ")\n";

    $msg = new AMQPMessage(
        (string)fib($n),
        array('correlation_id' => $req->get('correlation_id'))
    );

    // 发送消息到交换器,交换器为默认交换器(空交换器)
    $req->delivery_info['channel']->basic_publish(
        $msg,
        $exchange,
        $req->get('reply_to')
    );

    // 消息确认
    $req->ack();
};

// 设置消费者允许的最大未确认消息数量为1
$channel->basic_qos(null, 1, null);

// 定义队列的消费回调,将消息传递给回调函数同时关闭自动消息确认
$channel->basic_consume($queue, '', false, false, false, false, $callback);

// 循环消费/接收消息
while ($channel->is_open()) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();

客户端,6-rpc-client.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;


class FibonacciRpcClient
{
    private $connection;
    private $channel;
    private $callback_queue;
    private $response;
    private $corr_id;

    public function __construct($config)
    {
        // 连接RabbitMQ服务器
        $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
        $this->channel = $this->connection->channel();

        $this->exchange = $config['exchange'];
        $this->queue = $config['queue'];

        // 创建临时队列,队列名是随机的
        list($this->callback_queue, ,) = $this->channel->queue_declare('', false, false, true, false);

        // 定义队列的消费回调,将消息传递给回调函数同时自动消息确认
        $this->channel->basic_consume($this->callback_queue, '', false, true, false, false, array($this, 'onResponse'));
    }

    public function onResponse($rep)
    {
        if ($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }

    public function call($n)
    {
        $this->response = null;
        $this->corr_id = uniqid();

        // 定义消息,消息带有reply_to和correlation_id属性
        $msg = new AMQPMessage(
            (string) $n,
            array(
                'correlation_id' => $this->corr_id,
                'reply_to' => $this->callback_queue
            )
        );

        // 发送消息
        $this->channel->basic_publish($msg, $this->exchange, $this->queue);

        // 循环消费/接收消息
        while (!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }
}

$config = RABBITMQ_CONF;
$config['exchange'] = '';
$config['queue'] = 'rpc_queue';
$fibonacci_rpc = new FibonacciRpcClient($config);
$response = $fibonacci_rpc->call(30);
echo ' [.] Got ', $response, "\n";

7. 生产者确认模式

生产者进行可靠的发布确认,生产者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用生产者确认后,RabbitMQ将异步确认生产者发送的消息,这意味着它们已在服务器端处理。

发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下不启用它们。使用 confirm_select 方法在频道级别启用发布者确认:

$channel = $connection->channel();
$channel->confirm_select();

注意:不需要所有的消息都开启,为特定的需要保证高可靠的消息才开启该模式。该模式会降低性能,因为消息的确认会阻塞后续消息的发布。


应用场景:对于消息可靠性要求较高,比如钱包扣款


PHP

开启发布者确认

$channel = $connection->channel();
$channel->confirm_select();

有三种方式进行消息的确认:

  • 发布单个消息并同步等待其确认
  • 发布多个消息并同步等待其确认
  • 异步确认消息

发布单个消息并同步等待其确认(同步机制)

while (thereAreMessagesToPublish()) {
    $data = "Hello World!";
    $msg = new AMQPMessage($data);
    $channel->basic_publish($msg, 'exchange');
    // uses a 5 second timeout
    $channel->wait_for_pending_acks(5.000);
}

使用 $channel::wait_for_pending_acks(int|float) 方法等待其确认。一旦消息得到确认,该方法就会返回。如果消息在超时内没有得到确认或者它被 nacked(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重试发送消息。


发布多个消息并同步等待其确认(同步机制)

$batch_size = 100;
$outstanding_message_count = 0;
while (thereAreMessagesToPublish()) {
    $data = ...;
    $msg = new AMQPMessage($data);
    $channel->basic_publish($msg, 'exchange');
    $outstanding_message_count++;
    if ($outstanding_message_count === $batch_size) {
        $channel->wait_for_pending_acks(5.000);
        $outstanding_message_count = 0;
    }
}
if ($outstanding_message_count > 0) {
    $channel->wait_for_pending_acks(5.000);
}

与等待单个消息的确认相比,等待一批消息的确认这种方式大大提高了吞吐量。

缺点是:

  • 在失败的情况下并不知道是哪个消息失败了
  • 这个方案是同步的,会阻塞消息的发布

异步确认消息(异步机制)

$channel = $connection->channel();
$channel->confirm_select();

$channel->set_ack_handler(
    function (AMQPMessage $message){
        // code when message is confirmed
    }
);

$channel->set_nack_handler(
    function (AMQPMessage $message){
        // code when message is nack-ed
    }
);

这里有两个回调:

  • 一个用于确认消息
  • 另一个用于 nacked 消息(可以被broker视为丢失的消息)

每个回调都有带有返回消息的 AMQPMessage $message 参数,因此无需处理序列号(交付标记)即可了解此回调属于哪个消息。

这种方式的性能是最好的


5-1 常见消息队列的对比


除了RabbitMQ之外,还有其他比较常见的消息队列中间件,如KafkaRocketMQActiveMQ等,下面的表格中列举了这几种消息队列的差异:

消息队列 RabbitMQ ActiveMQ RocketMQ Kafka
所属公司/社区 Mozilla Public License Apache Ali Apache
成熟度 成熟 成熟 比较成熟 成熟
授权方式 开源 开源 开源 开源
开发语言 Erlang Java Java Scala & Java
客户端支持语言 官方支持Erlang,java,Ruby等,社区产出多种语言API,几乎支持所有常用语言 JAVA,C++,pyhton,php,perl,net等 java,c++ 官方支持java,开源社区有多种语言版本,如PHP,python,go,c/c++,ruby,node.js等语言
协议支持 多协议支持AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP/AMOP 自定义的一套 自定义协议
消息批量操作 不支持 支持 支持 支持
消息推拉模式 多协议,pull/push均有支持 多协议pull/push均有支持 多协议,pull/push均有支持 pull
HA master/slave模式 基于zookeeper+LevelDB的master-slave 支持多master模式,多master多slave模式,异步复制模式 支持replica机制,leader宕掉后,备份自动顶替,并重新选择leader
数据可靠性 可以保证数据不丢,有slave备份 master/slave 支持异步实时刷盘,同步刷盘,同步复制,民步复制 数据可靠,并且有replica机制,有容错容灾能力
单机吞吐量 其次(万级) 最差(万级) 最高(十万级) 次之(十万级)
消息延迟 微秒级 \ 比Kafka快 毫秒级
持久化能力 内存、文件、支持数据堆积,但数据堆积反过来影响生产速率 内存、文件、数据库 碰盘文件 磁盘文件 磁盘文件,只要磁盘容量够,可以做到无限消息堆积
是否有序 若想有序,只能使用一个client 可以支持有序 有序 多client保证有序
事务 支持 支持 支持 不支持
集群 支持 支持 支持 支持
负载均衡 支持 支持 支持 支持
管理界面 较好 一般 命令行界面 官方只提供了命令行版
部署方式 独立 独立 独立 独立

6-1. Web页面操作

进入 web 管理界面之后,可以很清晰的看到分了 6 个菜单目录,分别是:Overview、Connections、Channels、Exchanges、Queues、Admin


1. 页面介绍

  • Overview:总览页面,主要介绍RabbitMQ的一些基础汇总信息

image-20210622003214978


  • Connections:连接池管理,主要介绍客户端连接等信息

image-20210622003400652


  • Channel:信道管理,主要是介绍信道连接等信息

image-20210622003509951

点击某个具体的信道,可以看到对应的消费队列等信息

image-20210622003603159


  • Exchanges:交换器管理,主要是介绍交换器等信息

image-20210622003643068


  • Queues:队列管理,主要是介绍队列等信息

image-20210622003731251


  • Admin:系统管理,主要是用户管理、虚拟主机、权限等信息

image-20210622003846383


2. 交换器管理

点击进入Exchange页面,最下面有个Add a new exchange标签:

image-20210622004229711

  • Name:交换器名称
  • Type:交换器类型
  • Durable:是否持久化,Durable:持久化,Transiend:不持久化
  • Auto delete:是否自动删除,如是的话当最后一个绑定(队列或其他Exchange)解绑之后,该Exchange会自动被删除
  • Internal:是否是内部专用Exchange,如果是则意味着不能往这个Exchange发送消息
  • Argument:参数,AMQP协议实现做扩展使用的

3. 队列管理

点击进入 Queues 菜单,最下面也有一个Add a new queue标签:

image-20210622004742541

  • Name:队列名称
  • Durability:是否持久化,Durable:持久化,Transient:不持久化
  • Auto delete:是否自动删除,是的话,当队列内容为空时,会自动删除队列
  • Arguments:参数,AMQP协议实现做扩展使用的

4. 绑定

建立绑定关系,可以从Queues页面进去也可以从Exchange页面进去

如果是从Exchange页面进入,那么被关联的对象就是队列,点击对应的Exchange,进入详情页:

image-20210622005124336

如果是从Queues页面进入,那么被关联的对象就是交换器,点击对应的Queue,进入详情页:

image-20210622005301780

建立好了之后就可以看到对应的绑定关系了:

image-20210622010014137


5. 发送消息

发送消息,可以从Queues页面进去也可以从Exchange页面进去

在Queue详情页或Exchange详情页,点击Publish message标签,填写对应的路由键,发送数据:

image-20210622010138546

然后点击Get messages标签,可以获取队列中的消息:

image-20210622010321093


参考

空文件

简介

RabbitMQ的测试代码 展开 收起
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Python
1
https://gitee.com/paultest/rabbitmq-python.git
git@gitee.com:paultest/rabbitmq-python.git
paultest
rabbitmq-python
rabbitmq-python
master

搜索帮助