1 Star 0 Fork 0

le1024 / le1024

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
24_activemq消费重试机制.md 4.39 KB
一键复制 编辑 原始数据 按行查看 历史
le1024 提交于 2022-04-28 10:27 . m

消息重试机制

官网地址

  1. 什么情况下会引起消息重发

    • 消费者端用了transactions且在session中调用了rollback
    • 消费者端用了transactions且在调用commit之前关闭或者未commit
    • 消费者端在SESSION_ACKNOWLEDGE的传递模式下,在session中调用了recover
  2. 消息重发时间间隔和重发次数

    • 时间间隔:1
    • 重发次数:6
  3. 有毒消息Poison ACK

    一个消息被redelivedred超过默认最大的重复次数(默认6次)时,消费端会给MQ发送一个“Posion ACK”,表示这个消息有毒,通知broker不要再发了。这时候broker会把这个消息放到DLQ(死信队列)。

属性说明

collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加上一个时间波动范围。默认值0.15

maximumRedeliveries:最大重传次数,达到最大重传次数后抛异常。为-1时不限制次数,为0时表示不进行重传。默认值6

maximumRedeliveryDealy:最大传送延迟,只在useExponentialBackOff为true时有效(V5.5)。假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。

initialRedeliveryDelay初始重发延迟时间,默认1000L

redeliveryDelay:重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L

useCollissionAvoidance:启用防冲突功能,默认false

useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false

backOffMultiplier:重连时间间隔递增,只有值大于1和启用useExponentialBackOff参数时生效。默认5

验证

  • 消费者端开启事务
Session session = connection.createSession(true,  Session.AUTO_ACKNOWLEDGE);

#不要进行事务提交
// session.commit()
  • 生产者生产一次消息,启动消费者

    消费端不要用MessageListener监听器的方式去测试

1.第一次启动消费者,会正常消费一次
2.之后再启动6次消费者,都会消费到数据(默认6次)
3.第7次,无消息消费

在第7次后,消息标记为有毒消息,会进入DLQ,死信队列

image-20210823225716019

相关配置

如果不想重试6次后,再将消息标记为有毒消息,可以配置为3次:topicPolicy.setMaximumRedeliveries(3)

其他配置可参考官网如下:

ActiveMQConnection connection ...  // Create a connection

RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);

RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);

// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);

springboot配置重试机制

@Component
@EnableJms
public class ActivemqConfig {
    
    //.....

	@Bean
    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
        //是否在每次尝试重新发送失败后,增长这个等待时间
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重发次数,默认为6次   这里设置为10次
        redeliveryPolicy.setMaximumRedeliveries(10);
        //重发时间间隔,默认为1秒
        redeliveryPolicy.setInitialRedeliveryDelay(1);
        //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }
}
1
https://gitee.com/le1024/le1024.git
git@gitee.com:le1024/le1024.git
le1024
le1024
le1024
master

搜索帮助