代码拉取完成,页面将自动刷新
同步操作将从 胖叔讲java/rabbit-mq-plus 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
<dependency>
<groupId>com.cfpamf.inf</groupId>
<artifactId>rabbit-mq-plus</artifactId>
<version>0.0.1-opensource</version>
</dependency>
例:@ComponentScan(basePackages = { "com.cfpamf.athean.rabbit.mq.plus"})
##配置mybatis扫描目录
@MapperScan("com.cfpamf.athean.rabbit.mq.plus.mapper")
##配置mybatisPlus扫描目录
typeAliasesPackage: com.cfpamf.athean.rabbit.mq.plus.domain.po
typeEnumsPackage: com.cfpamf.athean.rabbit.mq.plus.common.enums
##参考代码
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
sqlSessionFactoryBean.setTypeAliasesPackage("com.cfpamf.athean.rabbit.mq.plus.domain.po");
sqlSessionFactoryBean.setTypeEnumsPackage("com.cfpamf.athean.rabbit.mq.plus.common.enums");
注意rabbitmq要开启ack,开启发送确认和发送失败回退
spring:
rabbitmq:
host: ${host}
port: 5672
username: ${username}
password: ${password}
publisher-confirm-type: correlated
publisher-confirms: true #开启发送确认,开启publisher-confirm,且选择correlated,则MQ发送结果,会异步回调ConfirmCallback方法
publisher-returns: true #开启发送失败回退
#template:
#mandatory: true #定义当消息从交换机路由到队列失败时的策略。【true,则调用ReturnCallback;false:则直接丢弃消息】
#开启ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual #采取手动应答
# concurrency: 1 #指定最小的消费者数量
# max-concurrency: 1 #指定最大的消费者数量
retry:
enabled: true # 是否支持重试
CREATE TABLE `rabbit_mq_plus_event_task` (
`id` bigint(20) NOT NULL COMMENT '编号',
`event_no` varchar(64) NOT NULL COMMENT '事件编号',
`event_type_code` varchar(20) NOT NULL COMMENT '事件类型代码:',
`event_type_desc` varchar(20) NOT NULL COMMENT '事件类型描述:',
`status_code` int(11) NOT NULL COMMENT '状态:10、待处理 20、处理中 30、已处理',
`status_desc` varchar(10) NOT NULL COMMENT '状态描述:10、待处理 20、处理中 30、已处理',
`max_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '最大重试次数',
`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_execute_time` datetime NOT NULL COMMENT '下一次执行时间',
`message_body` varchar(1024) NOT NULL COMMENT '数据',
`event_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '事件时间',
`create_by` varchar(30) NOT NULL DEFAULT 'system' COMMENT '创建人',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_by` varchar(30) NOT NULL DEFAULT 'system' COMMENT '修改人',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`enabled_flag` tinyint(4) NOT NULL DEFAULT '1' COMMENT '是否可用 1:是 0:否(已删除)',
PRIMARY KEY (`id`),
KEY `idx_event_no_event_type_code` (`event_no`,`event_type_code`),
KEY `idx_event_no_event_type_code_status_code` (`event_no`,`event_type_code`,`status_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='rabbitMqPlus事件任务表';
CREATE TABLE `rabbit_mq_plus_consumer_task` (
`id` bigint(20) NOT NULL COMMENT '编号',
`queue_name` varchar(64) NOT NULL COMMENT '消息编号',
`consumer_type_code` varchar(20) NOT NULL COMMENT '消费类型代码',
`consumer_type_desc` varchar(20) NOT NULL COMMENT '消费类型描述',
`consumer_tag` varchar(20) DEFAULT NULL COMMENT '消费标记',
`status_code` int(11) NOT NULL COMMENT '状态:10、待处理 20、处理中 30、已处理',
`status_desc` varchar(10) NOT NULL COMMENT '状态描述:10、待处理 20、处理中 30、已处理',
`max_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '最大重试次数',
`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`consumer_listener_impl_class_name` varchar(128) NOT NULL COMMENT '消费者监听实现类',
`next_execute_time` datetime NOT NULL COMMENT '下一次执行时间',
`first_consumer_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '首次消费时间',
`consumer_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '消费时间',
`message_body` varchar(2048) NOT NULL COMMENT '数据',
`create_by` varchar(30) NOT NULL DEFAULT 'system' COMMENT '创建人',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_by` varchar(30) NOT NULL DEFAULT 'system' COMMENT '修改人',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`enabled_flag` tinyint(4) NOT NULL DEFAULT '1' COMMENT '是否可用 1:是 0:否(已删除)',
PRIMARY KEY (`id`),
KEY `idx_queue_name_consumer_type_code_status_code` (`queue_name`,`consumer_type_code`,`status_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='rabbitMqPlus消费任务表';
@Resource
RabbitMqPlusProducer rabbitMqPlusProducer;
rabbitMqPlusProducer.sendTransactionalMessage(RabbitMqMessageDTO);
补偿机制:
com.cfpamf.athean.rabbit.mq.plus.producer.core.schedule.RabbitMqPlusProducerMessageRetryJob.runAutoRetryCheck()方法
1.项目启动运行,消费者监听器
public class XxxxRabbitMqPlusConsumerRun implements ApplicationRunner {
@Resource
RabbitMqConsumer rabbitMqConsumer;
@Override
public void run(ApplicationArguments args) throws Exception {
rabbitMqConsumer.runConsumeListener(RabbitMqPlusConsumerDTO);
}
2.消费者实现RabbitMqPlusConsumerListener接口onMessage方法
3.失败补偿 定时调度com.cfpamf.athean.rabbit.mq.plus.consumer.core.schedule.RabbitMqPlusConsumerFailureRetryJob.runAutoRetryCheck()方法
系统默认,删除30天后的发送成功消息记录,可以自定义时间
rabbitMqPlus:
delete:
invalid:
data:
day:60
参考 demo目录中的demo.zip文件 link
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。