1 Star 0 Fork 13

chgoatherd / MyRpc

forked from mjuyangyousong / MyRpc 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 11.27 KB
一键复制 编辑 原始数据 按行查看 历史
yousongyang 提交于 2022-01-27 15:06 . 修改编译器选项

MyRpc

EMAIL: mju_yangyousong@qq.com

POWERED BY Yousong Yang

编译环境

编译器需支持:C++17

如何开始

1.CLONE项目
2.mkdir build
3.cd build
4.cmake .. -DCMAKE_CXX_COMPILER={支持C++17的编译器}
5.make
6.初期运行 生成配置文件
7.关闭服务 修改配置文件

性能测试

测试平台: VMware虚拟机15 core:4 mem:8G 主机:thinpad E14 r5 4500 内存16gb

zeromq:
  muduo:50字节 单程延迟46us
        50000字节 单程延迟64us
  MyRpc:50字节 单程延迟43us
        50000字节 单程延迟62us
        150000字节 单程延迟112us

压力测试
  SERVER 100CPU 单线程 1500字节数据包 转发回Client QPS 90000+

Log:
  单线程 80字节信息 9341520+qps

Rpc:
  单对单序列RPC 解析RPC 调用RPC
    SERVER: 序列化RPC+发送包 一次调用一个数据包 连续发送 每次65.601ns
    Client: 接受包+解析RPC+调用 每次40.739ns

如何使用

RPC:
RPC消息包默认格式: 单位字节
|-------------------------------------------------------------------------------------------------------------|
| 消息包大小 4 | 消息类型(定义在dataformat) 4 | 调用PRC序号 4 | 消息数据大小 4 | 是否调用回调 1 | 消息数据 -- |
|-------------------------------------------------------------------------------------------------------------|
connect->Read 返回对端一次Write的内容 即:消息类型(定义在dataformat) 调用PRC序号 消息数据大小 是否调用回调 消息数据 or 任何数据
一次Write对应一次Read 需要口头规定消息包最大大小 以免一次Read无法读尽
Rpc调用过程:
对端注册一个RPC
Rpc.Init_fun(type(口头约定的类型),fun_address 可以是lambda 可以是函数地址,size 希望的接收到的参数数据包大小(消息数据大小),回调函数(可选))
发送端发送一个RPC
Message::Make_rpc(connnet,(口头约定的类型),缓冲区,是否调用回调,参数...)
    // Server:
      Main():
        myrpc::Message message = myprc::Message();  // 初始化messgae
        net_tools::Config config("Server");         // 加载配置
        net_tools::Log log;                         // 初始化LOG
        net_tools::Tcpserver tcp;                   // 创建TCP服务
        tcp.Set_connect_over_callback(conn_func);   // 设置回调
        tcp.Initserver(true,2,0);                   // 注册TCP服务
        tcp.Startserver();                          // 开始服务

      conn_func(net_tools::net::Connect* connect):
        connect->Set_read_callback(read_func);      // 设置收到消息包回调(可选)
        connect->Set_close_callback(close_func);    // 设置对端Close回调(可选)
        connect->Set_lose_callback(lose_func);      // 设置心跳包失效回调(可选)
        char buf[512] = {};                         // 用户缓冲区
        myrpc::Message::Make_rpc(connect,3,buf,true,12,13,14);  //调用 3号RPC调用 缓冲区为buf 调用回调函数 参数为 (int)12 13 14
        myrpc::Message::Make_rpc(connect,2,buf,true,15);        //调用 2号RPC调用 缓冲区为buf 调用回调函数 参数为 (int)15

    // Client:

      Main():
          myrpc::Message message = myprc::Message(); // 初始化messgae
          net_tools::Config config("Client");         // 加载配置
          net_tools::Log log;                         // 初始化LOG
          net_tools::Tcpclient tcp;                   // 创建TCP服务
          message.Get_rpc()->Init_fun(3,myrpc::base::Rpc::Test,sizeof_fun(myrpc::base::Rpc::Test),[](net_tools::net::Connect*,void*,unsigned int){NT_LOG_INFO << "rpc_test" << NT_LOG_ENDL;});
          message.Get_rpc()->Init_fun(2,Test,sizeof_fun(Test),[](net_tools::net::Connect*,void*,unsigned int){NT_LOG_INFO << "rpc" << NT_LOG_ENDL;});
          {
            int b = 0;
            auto fun = [=](int a)->void{NT_LOG_INFO << a << b << NT_LOG_ENDL;};
            message->Get_rpc()->Init_fun(1,fun); //接口会保存该lambda 延长生命周期
          }
          tcp.Set_connect_over_callback(conn_func);   // 设置回调
          tcp.Startclient(true);                      // 循环尝试连接
          pause();

          char buf[200] = {};                         // 缓冲区
      read_func(net_tools::net::Connect* connect)
          connect->Read(buf,200);                     // 读取消息 buf缓冲区地址 200 大小
          message->Consume_message(connect,buf,200);  // 消费消息

工程目录 OLD

├── base                         // 网络库 base
|   ├── base_buffer             // 能移动的buffer 零拷贝发送
│   ├── bplustree               // 模板B+树 Timebplustree调用
│   ├── channel                 // Channel 包装每一个event事件
│   ├── channelpool             // 管理一个线程内所有Channel
│   ├── condition               // 封装列posix的条件变量
│   ├── config                  // 解析配置 JSON
│   ├── count_down_cond         // 带计数器的条件变量
│   ├── cpuinfo                 // 分析平台CPU
│   ├── epoll                   // 封装列linux epoll
│   ├── eventloop               // 事件循环
│   ├── eventloopthread         // 包装了事件循环的线程
│   ├── eventloopthreadpool     // IO线程池
│   ├── function                // 引入std::function
│   ├── hash                    // 闭散列HASH
│   ├── json                    // 包装列JSONCPP
│   ├── logbuffer               // LOG的缓冲区
│   ├── log                     // LOG调用
│   ├── logfile                 // LOG写入的文件
│   ├── logstream               // LOG iostream风格调用
│   ├── logthread               // LOG线程
│   ├── mempool                 // 分配固定大小的内存池
│   ├── mutex                   // 封装posix 互斥量
│   ├── noncopyable             // 基类 不允许左值拷贝构造
│   ├── thread                  // 包装了posix线程
│   ├── threadpool              // 简易逻辑线程池
│   ├── timebplustree           // 管理单个线程单次定时任务
│   ├── timeevent               // 单词定时任务
│   ├── timeeventset            // 管理单个对象的单次定时任务
│   ├── timeevery               // 循环定时任务
│   ├── timeeverymap            // 管理单个对象打循环定时任务
│   ├── timequeue               // 管理单个线程打循环任务时间轮
│   ├── timer                   // 包装了timefd定时器
├── rpc                          // Rpc框架
│   ├── dataformat              // 信息包格式定义
│   ├── dmq                     // 分布式消息队列
│   ├── message                 // 信息包注册消费中心
│   ├── nodeformat              // 节点格式定义
│   ├── rpc                     // rpc框架
│   ├── raft                    // raft实现分布式(未实现)
│   ├── rpccall                 // 元编程Rpc序列 反序列
│   ├── serverfinder            // 服务发现
├── net                          // 网络库 net
│   ├── accept                  // 封装posix accpet
│   ├── buffer                  // TCP连接缓冲区
|   ├── buffer_reader           // base_buffer迭代器
│   ├── connect                 // 单个TCP连接
│   ├── connectpool             // 管理单个线程的CONNECT对象
│   ├── heartbeat               // 心跳系统 FOR SERVER
│   ├── socket                  // 封装posix socket调用
│   ├── tcpclient               // TCP client主体or单次连接
│   ├── tcpserver               // TCP server
└── user                         // 一些示例

Future:

eventloop已经过时 该网络库为过时的框架
网络库使用eventloop使得代码需要非阻塞 而很多的业务逻辑需要并行的rpc调用 过分的回调函数实现异步 所以说要使用应用层的线程来改造网络库
至少lock_free 追求wait_free 应用层的线程也容易实现负载均衡 避免一个调用卡住后面的调用导致超时 task_steal等
epoll的模型也能转换为io_uring 减少进入内核的次数 从而提高吞吐量 
现阶段的rpc是在eventloop上进行的 只能实现丑陋的回调异步 需要应用层线程来实现join 从而让业务逻辑能够简化并行处理多个rpc返回值的问题
而且可以方便的终止rpc 通过一个rpc的返回决定另外的rpc的处理方式
rpc也需要实现可扩展的格式 这在现基础上 可以使用数据包附带偏移量的指示 按照原来预定的设想 通过编译期的参数推导实现入参 框架集成pb 简化依赖项
为了可扩展性 protobuf可选支持
服务发现需要raft的支持 保证服务发现的高可用
集成管理的接口 不停机配置 维护

目标:

底层网络库io_uring+应用层线程+集成协议的rpc+raft支持的服务发现+raft接口拓展为集群功能+可选的解耦中间件(类似kafka的消息队列 减少整个大集群连接的拓扑复杂度)

下一步:

重构网络库
应用层线程支持
rpc协议更新
高可用的服务发现
raft接口拓展机器为集群功能
往上搭建DMQ 消息队列
实现最终的满足业务开发绝大多数需求的rpc架构 MyRpc

如何学习

推荐书籍:

C++

服务器

操作系统

分布式

数据库

杂项

工程简介

MyRpc

Rpc:

自研Rpc框架 两步实现Rpc调用

RPC: 使用元编程实现RPC序列反序列以及调用

自制function 实现类型擦除 相对于多态的类型擦除 调用速度提升7倍

faster_function 相对于多态实现的rpccall 速度提升15倍 qps达到 22m

不使用JSON等序列化工具 支持不定长 支持lambda(带捕获参数) 函数指针 自定义类型 回调函数

网络库:

参考muduo的事件驱动型网络库

相对于muduo添加了定时器树,心跳树功能

两种定时任务策略 定时器树,TIMERFD

适合作为接入,解析数据包,转发服务器,IO密集

LOG系统 c++ iostream风格

缓冲区系统 内部采用内存池 隔离用户与系统调用 简化用户业务逻辑 支持base_buffer零拷贝

内存池 仿ptmalloc 为缓冲区提供大块内存

逻辑线程池 任务队列 用以执行用户注册的业务函数

IO线程池 EPOLL监听注册IO描述符

定时器树 用B+树作为底层 维护收到连接注册的定时期任务或在用户调用的定时期任务

定时器循环任务map 以任务名为key 注册循环定时器任务 每个任务一个timefd 当定时任务多时 可用前缀树优化

心跳树 使用类B+树 通过划分心跳时间片 高效管理连接心跳

Config 使用json解析配置

C++
1
https://gitee.com/chgoatherd/MyRpc.git
git@gitee.com:chgoatherd/MyRpc.git
chgoatherd
MyRpc
MyRpc
master

搜索帮助