1 Star 0 Fork 1

gallop-project / gallop-mq-spring-boot-starter

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

gallop-mq-spring-boot-starter

license jdk springcloud springboot release

介绍

提供MQ通信支持的微服务组件,在微服务节点中引入此starter以扩展消息队列通信能力。

支持功能:

  • 消息按topic异步发送;
  • 消息按topic订阅;
  • 基于消息队列的远程调用(rpc模式)

支持消息队列:

开始使用

引入

<dependency>
    <groupId>com.gallop</groupId>
    <artifactId>gallop-mq-spring-boot-starter</artifactId>
    <version>1.0.0</version>
</dependency>

配置


gallop:
  mq:
    # 可不填scheme直接使用格式[ip:port],MQ集群时支持填写多个服务地址,用,分隔
    # scheme:
    # nats=nats://
    # rabbitMQ/ActiveMQ= amqp://
    # ActiveMQ 支持:amqp/mqtt/udp/http/nio/tcp... (参考 https://activemq.apache.org/configuring-transports.html)
    # 组件会为地址默认追加failover
    # kafka= ip1:port1,ip2:port2...
    hosts: amqp://192.168.10.218:5672
    # 连接MQ的用户名
    user: guest
    # 连接MQ的密码
    secret: guest
    # 将要使用的MQ类型
    type: rabbit_mq
    # 消息编解码器
    codec: gson
    # rpc调用默认超时(毫秒)。默认15s,<=0时无超时,阻塞等待直至响应
    request-timeout: 20000
    # 断线重连时间间隔(毫秒)。默认5s
    reconnect-interval: 5000

发送消息


import com.gallop.mq.MQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Component
public class ProducerService {

    @Autowired
    private MQTemplate mqTemplate;

    /**
     * 异步发送消息到指定topic
     * @param msg
     */
    public void send(FooBarMessage msg) {
        mqTemplate.send(msg, "FOO_BAR_TOPIC_SEND");
    }

    /**
     * 发起RPC调用,失败时或无响应返回null
     * @param msg 将要发送的消息
     * @return 远程响应
     */
    public FooBarResponse request(FooBarMessage msg) {
        return mqTemplate.request(msg, "FOO_BAR_TOPIC_REQUEST1");
    }

    /**
     * 发起RPC调用,使用函数式响应
     * @param msg 请求消息
     * @param onResponse 响应方法
     */
    public void request(FooBarMessage msg, Consumer<FoobarResponse> onResponse) {
        mqTemplate.req(msg, "FOO_BAR_TOPIC_REQUEST2", onResponse,
                // onError非必需
                (msg, exception) -> exception.printStackTrace()
        );
    }
}

监听消息


import com.gallop.mq.annotation.MQMessageListener;
import com.gallop.mq.core.dispatcher.DispatchRpcCallable;
import org.springframework.stereotype.Component;

// 在类上增加注解,启动时扫描此bean
@MQMessageListener
@Component
public class FooBarMessageListener {

    /**
     * 监听消息send
     * @param sent 目前必须放在参数第一位
     */
    @MQMessageListener(topic = "FOO_BAR_TOPIC_SEND")
    public void onMessage(FooBarMessage sent) {
        // 响应mqTemplate#send
    }

    /**
     * 响应RPC请求,并返回结果(MQTemplate会提供默认的响应逻辑)
     * @param request 请求,目前必须放在参数第一位
     * @return 响应结果
     */
    @MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST1")
    public FooBarResponse response(FooBarMessage request) {
        // 响应mqTemplate#request
        return new FooBarResponse();
    }

    /**
     *  响应RPC请求,自定义RPC响应方式
     * @param request RPC请求信息
     * @param customCallback 自定义响应方式
     * @return 响应结果
     */
    @MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST_FUNC")
    public FooBarResponse responseFunction(FooBarMessage request, DispatchRpcCallable customCallback) {
        // 响应mqTemplate#request
        return new FooBarResponse();
    }
}

拦截器

作用:在消息到达服务节点的分发前后触发;

在注解MQMessageListener中填写实现了MQListenerInterceptor接口的bean class。按声明顺序调用。

例:

import com.gallop.mq.core.MQListenerInterceptor;
import org.springframework.stereotype.Component;

@Component
public class Test {

    @MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST_FUNC",
            interceptor = {A.class, B.class, C.class})
    public void onMessage(FooBarMessage sent) {
        // 
    }

    public static class A implements MQListenerInterceptor {
        // ...
    }

    public static class B implements MQListenerInterceptor {
        // ...
    }

    public static class C implements MQListenerInterceptor {
        // ...
    }
}
方法 描述
beforeInvoke(Object message) 消息分发前调用,参数为收到的MQ消息
afterInvoke(Object message,Object result) 消息分发处理后调用,参数message: 收到的消息;result:返回的结果
onError(Object message, Exception exception) 消息分发时出现异常触发

自定义MQ实现

自定义MQTemplate

  1. 配置MQ类型gallop.mq.type选择custom;
  2. 提供实现MQTemplateInitializer接口的spring bean;
  3. MQTemplateInitializer实现类的init方法中提供实例化并返回自定义MQTemplate;

例:


import com.gallop.mq.MQTemplate;
import com.gallop.mq.core.MQTemplateInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CustomConfigExample {

    @Bean
    public MQTemplateInitializer customInitializer() {
        // 参数: MQ配置,消息分发器,消息编解码器,MQ工作线程池(可选)
        return (properties, dispatcher, codec, executorProvider)
                -> new CustomMQTemplate();
    }

    public static class CustomMQTemplate implements MQTemplate {
        //  实现略
    }
}

自定义编解码器

提供实现MQMessageCodec接口的spring bean,以覆盖优先级更低的gallop.mq.codec配置指定的编解码器。

自定义工作线程池

提供实现了MQExecutorProvider接口的spring bean,支持FunctionalInterface。 缺省使用 corePoolSize = 当前核数 * 2 + 1 的线程池

自定义消息分发器

提供实现了MQMessageDispatcher接口的spring bean,以覆盖缺省设置。

自定义监听

组件内除了扫描MQMessageListener注解注册成为消息监听以外, 还可以通过提供实现ExtendListenerScanner接口的spring bean批量注册监听容器; 与注解形式相比,此途径更偏向为运行时动态创建消息监听。

扫描注解获得的监听集合,与ExtendListenerScanner提供的监听集合都会加入MQMessageDispatcher消息分发器中。

可靠投递

...

里程碑

v1.0.0

  • 支持Kafka
  • 支持ActiveMQ
  • 支持RabbitMQ
  • MQ消息监听统一封装处理器 MessageDeliveryHandlerRegistry
  • MQ具体监听器使用动态代理
  • RPC模式统一流程封装(取消)
  • DispatchContext增加correlationId,启用自定义回调
  • ZMQ
MIT License Copyright (c) 2022 屁屁啊哈 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

提供微服务MQ支持。 展开 收起
Java
MIT
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/gallop-project/gallop-mq-spring-boot-starter.git
git@gitee.com:gallop-project/gallop-mq-spring-boot-starter.git
gallop-project
gallop-mq-spring-boot-starter
gallop-mq-spring-boot-starter
master

搜索帮助