代码拉取完成,页面将自动刷新
同步操作将从 程序员大彬/Java-learning 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
死信队列:消费失败的消息存放的队列。消息消费失败的原因:
设置死信队列的 exchange 和 queue,然后进行绑定:
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(RabbitMqConfig.DLX_EXCHANGE);
}
@Bean
public Queue dlxQueue() {
return new Queue(RabbitMqConfig.DLX_QUEUE, true);
}
@Bean
public Binding bindingDeadExchange(Queue dlxQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(dlxQueue).to(deadExchange).with(RabbitMqConfig.DLX_QUEUE);
}
在普通队列加上两个参数,绑定普通队列到死信队列。当消息消费失败时,消息会被路由到死信队列。
@Bean
public Queue sendSmsQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 绑定该队列到私信交换机
arguments.put("x-dead-letter-exchange", RabbitMqConfig.DLX_EXCHANGE);
arguments.put("x-dead-letter-routing-key", RabbitMqConfig.DLX_QUEUE);
return new Queue(RabbitMqConfig.MAIL_QUEUE, true, false, false, arguments);
}
生产者完整代码:
@Component
@Slf4j
public class MQProducer {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
RandomUtil randomUtil;
@Autowired
UserService userService;
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
log.info("correlationData: " + correlationData);
log.info("ack: " + ack);
if(!ack) {
log.info("异常处理....");
}
};
final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
log.info("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
public void sendMail(String mail) {
//貌似线程不安全 范围100000 - 999999
Integer random = randomUtil.nextInt(100000, 999999);
Map<String, String> map = new HashMap<>(2);
String code = random.toString();
map.put("mail", mail);
map.put("code", code);
MessageProperties mp = new MessageProperties();
//在生产环境中这里不用Message,而是使用 fastJson 等工具将对象转换为 json 格式发送
Message msg = new Message("tyson".getBytes(), mp);
msg.getMessageProperties().setExpiration("3000");
//如果消费端要设置为手工 ACK ,那么生产端发送消息的时候一定发送 correlationData ,并且全局唯一,用以唯一标识消息。
CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend(RabbitMqConfig.MAIL_QUEUE, msg, correlationData);
//存入redis
userService.updateMailSendState(mail, code, MailConfig.MAIL_STATE_WAIT);
}
}
消费者完整代码:
@Slf4j
@Component
public class DeadListener {
@RabbitListener(queues = RabbitMqConfig.DLX_QUEUE)
public void onMessage(Message message, Channel channel) throws IOException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//手工ack
channel.basicAck(deliveryTag,false);
System.out.println("receive--1: " + new String(message.getBody()));
}
}
当普通队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的死信交换机去,然后被路由到死信队列。可以监听死信队列中的消息做相应的处理。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。