1 Star 7 Fork 1

zhen_hong / hangu-rpc

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

hangu-rpc

一、介绍

该框架为rpc原理学习者提供一个思路,一个非常简单的轻量级rpc框架, 项目中没有使用非常复杂的设计,主打一个简单易用,快速上手,读懂源码实现,目前 注册中心默认实现了redis哨兵,zookeeper和本hangu系列的hangu-register注册中心 (测试代码默认使用的是hangu-register注册中心)。 如果你是一个对rpc原理好奇的人,你可以阅读本框架源码,快速了解rpc的核心实现。

hangu 是函谷的拼音。 为什么取这个名字呢?因为有一次和朋友聊天,聊到了《将夜》,说大师兄和余帘骑着一头牛进了一个叫函谷的地方, 于是道德经就出现了。所以我希望每个进入函谷的人,都能写出自己的道德经。

二、软件架构

image

从架构图中可以看到,心跳由消费者主动发起,默认每隔2s向服务提供者发送心跳包,心跳的实现很简单,在消费者这边 使用 Netty 提供的 IdleStateHandler 事件处理器,在每隔2s发起读超时事件时向提供者发送心跳,超过3次未收到 提供者的响应即认为需要重连,消费者端的 IdleStateHandler 配置代码如下:

/**
 * 代码位置{@link com.hangu.rpc.consumer.client.NettyClient#open}
 */
// 省略前部分代码
.addLast(new ByteFrameDecoder())
.addLast(new RequestMessageCodec()) // 请求与响应编解码器
.addLast(new HeartBeatEncoder()) // 心跳编码器
.addLast("logging", loggingHandler)
// 每隔 2s 发送一次心跳,超过三次没有收到响应,也就是三倍的心跳时间,重连
.addLast(new IdleStateHandler(2, 0, 0, TimeUnit.SECONDS))
.addLast(new HeartBeatPongHandler(NettyClient.this)) // 心跳编码器
.addLast(new ResponseMessageHandler(executor));
// 省略后部分代码

可以看到有三个与心跳相关的处理器,分别为 HeartBeatEncoder,IdleStateHandler,HeartBeatPongHandler, 其中 IdleStateHandler 配置了读取超时为2s,超过2s没有收到读事件,那么就会发出读超时事件, 发出读超时事件之后, 由 HeartBeatPongHandler 处理该事件,处理的逻辑如下:

@Slf4j
public class HeartBeatPongHandler extends ChannelInboundHandlerAdapter {

  private NettyClient nettyClient;

  private int retryBeat = 0;

  public HeartBeatPongHandler(NettyClient nettyClient) {
    this.nettyClient = nettyClient;
  }

  @Override
  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    // 尝试重连
    this.reconnect(ctx);
    super.channelUnregistered(ctx);
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 这里主要是为了解决网络抖动,误判机器下线,等网络正常时,注册中心再次通知
    // 那么需要重新标记为true
    this.nettyClient.getClientConnect().setRelease(false);
    this.nettyClient.getClientConnect().resetConnCount();
    super.channelActive(ctx);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 收到消息(无论是心跳消息还是任何其他rpc消息),重置重试发送心跳次数
    this.retryBeat = 0;
    // 这里主要是为了解决网络抖动,误判机器下线,等网络正常时,注册中心再次通知
    // 那么需要重新标记为true
    this.nettyClient.getClientConnect().setRelease(false);
    this.nettyClient.getClientConnect().resetConnCount();
    super.channelRead(ctx, msg);
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
      IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
      IdleState idleState = idleStateEvent.state();
      // 读超时,发送心跳
      if (IdleState.READER_IDLE == idleState) {
        if (retryBeat > 3) {
          // 关闭重连,通过监听 channelUnregistered 发起重连
          ctx.channel().close();
        } else {
          PingPong pingPong = new PingPong();
          pingPong.setId(CommonUtils.snowFlakeNextId());
          pingPong.setSerializationType(SerializationTypeEnum.HESSIAN.getType());
          // 发送心跳(从当前 context 往前)
          ctx.writeAndFlush(pingPong).addListener(future -> {
            if (!future.isSuccess()) {
              log.error("发送心跳失败!", future.cause());
            }
          });
          ++retryBeat;
        }
      } else {
        super.userEventTriggered(ctx, evt);
      }
    } else {
      super.userEventTriggered(ctx, evt);
    }
  }

  private void reconnect(ChannelHandlerContext ctx) {

    ClientConnect clientConnect = this.nettyClient.getClientConnect();
    int retryConnectCount = clientConnect.incrConnCount();
    // N次之后还是不能连接上,放弃连接
    if (clientConnect.isRelease()) {
      log.info("该链接{}已经被标记为释放,不再重连", clientConnect.getHostInfo());
      return;
    }
    // 如果连接还活着,不需要重连
    if (clientConnect.isActive()) {
      return;
    }
    int delay = 2 * (retryConnectCount - 1);
    // 最大延迟20秒再执行
    if (delay > 20) {
      delay = 20;
    }

    ctx.channel().eventLoop().schedule(() -> {
      ConnectManager.doCacheReconnect(this.nettyClient);
    }, delay, TimeUnit.SECONDS);
  }
}

userEventTriggered 方法接收到读超时事件后,会判断当前连接是否已经有连续三次未接收到来自提供者的数据了,如果超过了,那么尝试重连, 如果没有超过三次,主动向提供者发出心跳,发出的心跳由 HeartBeatEncoder 编码器进行编码向提供者发送消息,在接收到提供者的响应之后,将重试次数归零。

这里我们使用 IdleStateHandler 的读超时实现了每隔2s向服务提供者发送心跳,超过三倍的心跳时间未接收到响应就认为该连接已断开,需要 重连。

服务提供者端不会主动向消费者发送心跳,它只会被动接收心跳,但超过8s未接收到任何来自消费者的读写数据时,主动关闭连接

/**
 * 代码位置{@link com.hangu.rpc.provider.server.NettyServer.start}
 */
// 读写时间超过8s,表示该链接已失效
.addLast(new IdleStateHandler(0, 0, 8, TimeUnit.SECONDS))

三、快速启动

3.1 测试

hangu-demo里有两个子模块,分别是提供者和消费者,启动这两个模块,调用UserController的测试代码即可

3.2 引入项目使用
3.2.1 springboot项目
  • 配置

如果你使用的时springboot,那么很好,直接引入以下依赖:

<dependency>
      <groupId>org.hangu.rpc</groupId>
      <artifactId>hangu-rpc-spring-boot-starter</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>

配置hangu-rpc(注册中心为redis哨兵模式的):

server:
  port: 8080
hangu:
  rpc:
    provider:
      port: 8089 # 提供者将要暴露的端口
    registry:
      protocol: redis # 选择zk作为注册中心
      redis:
        nodes: ip:port,ip2:port2 # redis哨兵模式哨兵集群地址
        master: xxx # master的名字
        password: xxx # redis的密码

配置hangu-rpc(注册中心为zk):

server:
  port: 8080
hangu:
  rpc:
    provider:
      port: 8089 # 提供者将要暴露的端口
    registry:
      protocol: zookeeper # 选择zk作为注册中心
      zookeeper:
        hosts: ip:port,ip2:port2 # zk集群地址
        user-name: yyy
        password: yyy

如果你有自己的注册中心,可以选择实现 com.hangu.rpc.common.registry.RegistryService 接口,然后将 hangu.rpc.registry.protocol=你自己的注册中心名字(其实这里本人使用的时springboot的@ConditionalOnProperty去动态加载, 也就是说你只要 保证spring容器中只存在一个实现了RegistryService接口的注册服务即可)

  • 提供者

假设你有以下服务

@HanguService
public class UserServiceImpl implements UserService {
    @Override
    public UserInfo getUserInfo() {
        UserInfo userInfo = new UserInfo();
        userInfo.setName("小风");
        userInfo.setAge(27);
        Address address = new Address();
        address.setProvince("江西省");
        address.setCity("赣州市");
        address.setArea("于都县");
        userInfo.setAddress(address);
        return userInfo;
    }

    @Override
    public String getUserInfo(String name) {
        return name;
    }
}

接口为 UserService,实现类为 UserServiceImpl,如果你要暴露该接口,那么只需要在这个实现类上标注 @HanguService 注解即可, 这个注解目前有三个属性,分别为groupName,interfaceName,version,组通常是一系列相关业务的分组,通常会设置为服务名,因为我们的服务通常 会以业务进行垂直划分,interfaceName表示接口名,默认不填的情况下,会自动赋值为接口全路径类名,version表示版本,有时候我们给多个第三方提供服务的 时候可能业务上有差别,对于老的业务代码我们肯定不会去做改动,此时我们可以选择通过版本进行区分。

  • 消费者

上面提供者提供了 UserService 服务,如果我们要引入这个服务怎么办?

@HanguReference
public interface UserService {

    @HanguMethod(timeout = 20, callback = SimpleRpcResponseCallback.class)
    UserInfo getUserInfo(RpcResponseCallback callback);

    String getUserInfo(String name);
}

可以看到这个接口上标注了 @HanguReference 注解,这个注解与服务提供者的 @HanguService 注解一样拥有 groupName,interfaceName,version 三个属性,只要这三个属性与提供者的一一对应就能正常提供服务,这意味只要指定了 interfaceName这个名字与服务提供者一致,即使 UserService 这个接口类型的名字你瞎写,放在任何包下都可以正常服务,但是不建议 这么做,最好对接口进行抽离成单独的模块,打成jar包去引入

在类上指定了 @HanguReference 之后,我可以在方法上标注 @HanguMethod 注解,这个注解目前有 timeout与callback两个属性, timeout用来指定该方法调用超时的时间,callback用来指定回调类实现,需要实现 RpcResponseCallback 接口,并且提供无参数构造器, 如果你不想在 @HanguMethod 注解上指定回调,那么可以在方法参数上指定实现了 RpcResponseCallback 接口的回调,这样方法由同步调用 变成了异步调用。

3.2.2 普通spring项目

如果你是用的普通spring项目,想要引入hangu-rpc,那么你可以通过以下方式启动


@EnableHanguRpc
public class HanguRpcBootstrapConfig {
    
}

注意:要确保加入了以下依赖(该依赖用于处理ConfigurationProperties注解)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
</dependency>

其他配置与消费者提供者的操作与springboot的一样

3.2.2 普通项目
  • 配置 如果你没有使用spring框架,那么你只需要在pom.xml中添加以下依赖
<dependency>
  <groupId>org.hangu.rpc</groupId>
  <artifactId>hangu-starter</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>
  • 提供者

依然假设你有一个叫 UserServiceImpl的接口实现


public class Provider {
    public static void main(String[] args) {

        // 主要用于设置启动的端口
        HanguProperties hanguProperties = new HanguProperties();
        
        // 启动服务
        HanguRpcManager.openServer(hanguProperties);
        
        ServerInfo serverInfo = new ServerInfo();
        serverInfo.setGroupName("用户服务系统");
        serverInfo.setInterfaceName("userService");
        serverInfo.setVersion("1.0");
        Class<UserSerice> interfaceClass = UserService.class;
        UserService service = new UserServiceImpl();
        ZookeeperConfigProperties properties = new ZookeeperConfigProperties();
        properties.setHosts("ip:port,ip:port");
        RegistryService registryService = new ZookeeperRegistryService(properties);
        ServiceBean<T> serviceBean =
                new ServiceBean<>(serverInfo, interfaceClass, service, registryService);
        ServiceExporter.export(serviceBean);
    }
}

  • 消费者
public class Consumer {
    public static void main(String[] args) {

        ServerInfo serverInfo = new ServerInfo();
        serverInfo.setGroupName("用户服务系统");
        serverInfo.setInterfaceName("userService");
        serverInfo.setVersion("1.0");

        Class<UserSerice> interfaceClass = UserService.class;
        ZookeeperConfigProperties properties = new ZookeeperConfigProperties();
        properties.setHosts("ip:port,ip:port");
        RegistryService registryService = new ZookeeperRegistryService(properties);
        
        // 主要用于设置消费者线程数量
        HanguProperties hanguProperties = new HanguProperties();

        ReferenceBean<T> referenceBean = new ReferenceBean<>(serverInfo, interfaceClass, registryService,
                hanguProperties);
        UserService service = ServiceReference.reference(referenceBean, CommonUtils.getClassLoader(this.getClass()));
        // 调用服务
        UserInfo userInfo = service.getUserInfo();
    }
}
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

该框架为rpc原理学习者提供一个思路,一个非常简单的轻量级rpc框架,支持http请求rpc数据绑定。 展开 收起
取消

发行版 (6)

全部

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/yomea/hanggu-rpc.git
git@gitee.com:yomea/hanggu-rpc.git
yomea
hanggu-rpc
hangu-rpc
main

搜索帮助