1 Star 0 Fork 0

青天白云 / message

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

message

###消息队列高手课 ##一,基础篇(8讲) #02 | 该如何选择消息队列? 如果说,消息队列并不是你将要构建系统的主角之一,你对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,我建议你使用 RabbitMQ。 如果你的系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那 RocketMQ 的低延迟和金融级的稳定性是你需要的。 如果你需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合你的消息队列。 #03 | 消息模型:主题和队列有什么区别?

RocketMQ 的消息模型

每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。

#04 | 如何利用事务消息实现分布式事务? 实现订单下单场景:

  1. 首先通过producer.sendMessageInTransaction()方法发送一个半消息给MQ.
  2. 此时会在TransactionListener中的executeLocalTransaction()方法阻塞,然后在这个方法里面进行订单创建并提交本地事务,如果commit成功,则返回COMMIT状态,否则是ROLLBACK状态,如果正常返回COMMIT或者ROLLBACK的话,不会存在第3步的反查情况。
  3. 如果上面的本地事务提交成功以后,此节点突然断电,那么checkLocalTransaction()反查方法就会在某个时候被MQ调用,此方法会根据消息中的订单号去数据库确认订单是否存在,存在就返回COMMIT状态,否则是ROLLBACK状态。
  4. 购物车在另外一个项目中,反正只要收到MQ的消息就将本次订单的商品从购物车中删除即可。

https://rocketmq.apache.org/docs/transaction-example/ producer.setNamesrvAddr("127.0.0.1:9876"); #05 | 如何确保消息不会丢失?

检测消息丢失的方法

在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。 如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。 首先,像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。 Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

确保消息可靠传递 在生产阶段,你需要捕获消息发送的错误,并重发消息。 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。 #06 | 如何处理消费过程中的重复消息? 用幂等性解决重复消息问题 At least once(至少一次) + 幂等消费 = Exactly once(恰好一次)。

  1. 利用数据库的唯一约束实现幂等
  2. 为更新的数据设置前置条件 “将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。 (或者版本号) 3.记录并检查操作 大部分消息队列都选择只提供 At least once 的服务质量,而不是级别更高的 Exactly once 呢? 解决一个问题,往往会引发别的问题。若消息队列实现了exactly once,会引发的问题有: ①消费端在pull消息时,需要检测此消息是否被消费,这个检测机制无疑会拉低消息消费的速度。可以预想到,随着消息的剧增,消费性能势必会急剧下降,导致消息积压; ②检查机制还需要业务端去配合实现,若一条消息长时间未返回ack,消息队列需要去回调看下消费结果(这个类似于事物消息的回查机制)。这样就会增加业务端的压力,与很多的未知因素。 所以,消息队列不实现exactly once,而是at least once + 幂等性,这个幂等性让给我们去处理。

#07 | 消息积压了该如何处理? 优化性能来避免消息积压 1.发送端性能优化 只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。 2.消费端性能优化 一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。 在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。每个分区上实际上只能支持单线程消费。 一台机器只能有一个分区吗???(感觉不是) 消息积压了该如何处理? 发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多。

空文件

简介

暂无描述 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zyy2017/message.git
git@gitee.com:zyy2017/message.git
zyy2017
message
message
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891