代码拉取完成,页面将自动刷新
提供MQ通信支持的微服务组件,在微服务节点中引入此starter以扩展消息队列通信能力。
支持功能:
topic
异步发送;topic
订阅;支持消息队列:
<dependency>
<groupId>com.gallop</groupId>
<artifactId>gallop-mq-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
gallop:
mq:
# 可不填scheme直接使用格式[ip:port],MQ集群时支持填写多个服务地址,用,分隔
# scheme:
# nats=nats://
# rabbitMQ/ActiveMQ= amqp://
# ActiveMQ 支持:amqp/mqtt/udp/http/nio/tcp... (参考 https://activemq.apache.org/configuring-transports.html)
# 组件会为地址默认追加failover
# kafka= ip1:port1,ip2:port2...
hosts: amqp://192.168.10.218:5672
# 连接MQ的用户名
user: guest
# 连接MQ的密码
secret: guest
# 将要使用的MQ类型
type: rabbit_mq
# 消息编解码器
codec: gson
# rpc调用默认超时(毫秒)。默认15s,<=0时无超时,阻塞等待直至响应
request-timeout: 20000
# 断线重连时间间隔(毫秒)。默认5s
reconnect-interval: 5000
import com.gallop.mq.MQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class ProducerService {
@Autowired
private MQTemplate mqTemplate;
/**
* 异步发送消息到指定topic
* @param msg
*/
public void send(FooBarMessage msg) {
mqTemplate.send(msg, "FOO_BAR_TOPIC_SEND");
}
/**
* 发起RPC调用,失败时或无响应返回null
* @param msg 将要发送的消息
* @return 远程响应
*/
public FooBarResponse request(FooBarMessage msg) {
return mqTemplate.request(msg, "FOO_BAR_TOPIC_REQUEST1");
}
/**
* 发起RPC调用,使用函数式响应
* @param msg 请求消息
* @param onResponse 响应方法
*/
public void request(FooBarMessage msg, Consumer<FoobarResponse> onResponse) {
mqTemplate.req(msg, "FOO_BAR_TOPIC_REQUEST2", onResponse,
// onError非必需
(msg, exception) -> exception.printStackTrace()
);
}
}
import com.gallop.mq.annotation.MQMessageListener;
import com.gallop.mq.core.dispatcher.DispatchRpcCallable;
import org.springframework.stereotype.Component;
// 在类上增加注解,启动时扫描此bean
@MQMessageListener
@Component
public class FooBarMessageListener {
/**
* 监听消息send
* @param sent 目前必须放在参数第一位
*/
@MQMessageListener(topic = "FOO_BAR_TOPIC_SEND")
public void onMessage(FooBarMessage sent) {
// 响应mqTemplate#send
}
/**
* 响应RPC请求,并返回结果(MQTemplate会提供默认的响应逻辑)
* @param request 请求,目前必须放在参数第一位
* @return 响应结果
*/
@MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST1")
public FooBarResponse response(FooBarMessage request) {
// 响应mqTemplate#request
return new FooBarResponse();
}
/**
* 响应RPC请求,自定义RPC响应方式
* @param request RPC请求信息
* @param customCallback 自定义响应方式
* @return 响应结果
*/
@MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST_FUNC")
public FooBarResponse responseFunction(FooBarMessage request, DispatchRpcCallable customCallback) {
// 响应mqTemplate#request
return new FooBarResponse();
}
}
作用:在消息到达服务节点的分发前后触发;
在注解MQMessageListener
中填写实现了MQListenerInterceptor
接口的bean class
。按声明顺序调用。
例:
import com.gallop.mq.core.MQListenerInterceptor;
import org.springframework.stereotype.Component;
@Component
public class Test {
@MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST_FUNC",
interceptor = {A.class, B.class, C.class})
public void onMessage(FooBarMessage sent) {
//
}
public static class A implements MQListenerInterceptor {
// ...
}
public static class B implements MQListenerInterceptor {
// ...
}
public static class C implements MQListenerInterceptor {
// ...
}
}
方法 | 描述 |
---|---|
beforeInvoke(Object message) | 消息分发前调用,参数为收到的MQ消息 |
afterInvoke(Object message,Object result) | 消息分发处理后调用,参数message: 收到的消息;result:返回的结果 |
onError(Object message, Exception exception) | 消息分发时出现异常触发 |
gallop.mq.type
选择custom
;MQTemplateInitializer
接口的spring bean
;MQTemplateInitializer
实现类的init
方法中提供实例化并返回自定义MQTemplate
;例:
import com.gallop.mq.MQTemplate;
import com.gallop.mq.core.MQTemplateInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CustomConfigExample {
@Bean
public MQTemplateInitializer customInitializer() {
// 参数: MQ配置,消息分发器,消息编解码器,MQ工作线程池(可选)
return (properties, dispatcher, codec, executorProvider)
-> new CustomMQTemplate();
}
public static class CustomMQTemplate implements MQTemplate {
// 实现略
}
}
提供实现MQMessageCodec
接口的spring bean
,以覆盖优先级更低的gallop.mq.codec
配置指定的编解码器。
提供实现了MQExecutorProvider
接口的spring bean
,支持FunctionalInterface
。
缺省使用 corePoolSize = 当前核数 * 2 + 1
的线程池
提供实现了MQMessageDispatcher
接口的spring bean
,以覆盖缺省设置。
组件内除了扫描MQMessageListener
注解注册成为消息监听以外,
还可以通过提供实现ExtendListenerScanner
接口的spring bean
批量注册监听容器;
与注解形式相比,此途径更偏向为运行时动态创建消息监听。
扫描注解获得的监听集合,与ExtendListenerScanner
提供的监听集合都会加入MQMessageDispatcher
消息分发器中。
...
v1.0.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。