12 Star 29 Fork 9

AvenirTech 未来科技 / AvenirMQ

加入 Gitee
与超过 600 万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README.md

AvenirMQ

一个轻量化的消息中间件

安装

需要较高版本Node.js,推荐V14,支持跨平台

git clone https://gitee.com/onlyyyy/AvenirMQ.git

cd AvenirMQ

pm2 start AvenirMQ

或先安装pm2:npm i pm2 -g

并提供Nodejs版操作库:https://www.npmjs.com/package/avenirmq

代码结构

  • AvenirMQ.js 启动的主进程
  • dtest.js 测试用函数
  • run.ini 配置文件
  • user.json 用户文件
  • curl.js cli程序
  • /core AvenirMQ核心模块
  • /AvenirMQ 提供的Nodejs版操作库
  • clien1-4.js 测试用的接收方

技术特点

  • 1.使用签名进行消息分发,支持用户管理。
  • 2.通过routingkey来实现用户的消息绑定。
  • 3.支持连接池,并支持智能的连接淘汰策略。
  • 4.使用生产者/消费者模型。
  • 5.支持消息重发,死信回收
  • 6.灵活的策略配置
  • 7.提供cli程序进行管理

技术实现

  1. 用户管理

用户信息保存在user.json中

async login(data) {
    if (!data.name || !data.password) {
        throw ('INVALID_LOGIN');
    }
    let password = delQuotation(libcu.cipher.AesDecode(data.password));
    toLog("password = ", password);
    if (!this.userList[data.name] || this.userList[data.name].password != password) {
        throw ('INVALID_LOGIN');
    }
    //其他的就成功了
    let sign = getSign(data.name, data.password);
    toLog("生成签名", sign);
    return sign;
}

解析routingkey

//解析绑定时的from.to.key
parseKey(keys) {
    let arr = keys.split('.');
    toLog('arr = ', arr);
    if (arr.length != 3) {
        throw ("BAD_KEYS");
    }
    return {
        send: arr[0],
        to: arr[1],
        type: arr[2],
    }
}

加解密: 使用libcu.cipher.AesEncode/AesDecode函数进行加解密。

密钥参见libcu库。

用户增删改查暂略。

  1. 连接池
async add2ConnectPool(data, sign, client) {
    this.signPool[sign] = {
        conn: client,
        name: data.name,
        createTime: moment().valueOf(),
    };
    throw ({ code: SUCCESS, data: sign });
}

将连接保存到对象中,下次发送消息的时候会优先选用连接发送,失败的话就会更新连接池

  1. 消息发送-死信处理
async AvenirMQSend(msg, type) {
    //20210110先写个简单版的 不用promise.all发送消息
    for (let i = 0; i < msg.length; i++) {
        try {
            ······
            if(type != 'gc') {
                for (let j = 0; j < bind.length; j++) {
                    //20210116增加对类型的判断
                    if ((bind[j].type === sub.type || bind[j].type === AvenirMQ_ALL)
                        && (bind[j].receive === sub.to)) {
                        //说明这是要发送的消息
                        let info = {
                            ip: bind[j].ip,
                            port: bind[j].port,
                        };
                        let conn = null;
                        if (this.connPoll[bind[j].ip] && this.connPoll[bind[j].ip][bind[j].port]) {
                            conn = this.connPoll[bind[j].ip][bind[j].port].conn;
                        }
                        let newSub = JSON.parse(JSON.stringify(sub));
                        await this.send(text, conn, info, newSub, type, i);
                    } else {
                        toLog("存在无人接收的信息", msg[i]);
                    }
                }
            } else {
                let conn = null;
                if (this.connPoll[gcInfo.ip] && this.connPoll[gcInfo.ip][gcInfo.port]) {
                    conn = this.connPoll[gcInfo.ip][gcInfo.port].conn;
                }
                await this.send(text, conn, gcInfo, sub, type, i);
            }
                
            ····
        } catch (error) {
            toLog("AvenirMQSend error->",error);
        }
        
    }
}

将gc与普通的消息放在一个函数中处理。

接口协议格式

  1. 登录获得签名
{
    type:'login',
    name:'test',
    password:'AES',
}

返回值

{
    code:0,
    message:'success',
    data:'sign'
}
  1. 新建用户

//key为send.to.type的结构 表示自己的键值与接收的键值 以及接收的消息类型
//告诉AvenirMQ自己的连接信息
{
    "type":"addUser",
    "name":"test",  
    "password":"123456"
    "key":"a.b.rpc"
    "ip":"127.0.0.1"
    "port":13000,
}

返回值

{
    "code":0,
    "message":"success",
}
  1. 删除用户
{
    "type":"deleteUser",
    "name":"test",  
}

返回值

{
    "code":0,
    "message":"success",
}
  1. 修改用户信息

//key为send.to.type的结构 send : 接收名称为send的发送方消息,to:发送给谁 type:消息类型
//告诉AvenirMQ自己的连接信息
{
    "type":"updateUser",
    "name":"test",  
    "password":"123456"
    "key":"a.b.rpc"
    "ip":"127.0.0.1"
    "port":13000,
}

返回值

{
    "code":0,
    "message":"success",
}
  1. 发送消息
//生产者->消费者的概念
{
    sign:"test",
    type:"send",
    data:"hello world"
}

返回值

{
    "code":0,
    "message":"success",
}
  1. 修改绑定的键值
{
    sign:'test',
    type:'setKey',
    data: {
        name:"test",
        key:"a.b.rpc",
    }
}

返回值

{
    code:0,
    message:'success',
}
  1. 收到消息(无需请求,需要用户方起一个tcp服务器)
{
    code:0,
    message:'success',
    sender:'发送方的名字',
    data:'消息',
}
  1. 获取用户列表
{
    type:'userList',
}

返回值

{
    code:0,
    message:'success',
    data:[a,b],
}

配置文件示例

[main]
ip=127.0.0.1
port=52013

[mq]
#用户文件路径
userFileName=./user.json

#是否输出日志
ifConsoleLog=true

#连接超时时间
timeOut=10
#是否为长连接
keepAlive=true

#AvenirMQ发消息的超时时间(秒)
MQTimeOut=3

#AvenirMQ重发消息的周期(秒) 范围 2-50
MQResend=2
#重试次数
retryTime=5

结语

本项目照着RabbitMQ的思想简单地实现了一个消息中间件,不过没有使用AMQP协议,而只是简单的tcp处理,只能后期再优化了。

不过当这个项目在我脑海中浮现,我就认为我应该通过努力将它编写出来。目前AvenirMQ也达到了能用的程度了,这一路上学到的知识也是久久难忘的。

技术没有高低贵贱之分,脑海中如果有想法的话,我们要做的就是去把它实现。

百舸争流,奋楫者先。

编程之路漫漫修远兮,吾将上下而求索。

谢谢。

仓库评论 ( 5 )

你可以在登录后,发表评论

简介

用Node.js实现一个消息中间件 展开 收起
GPL-2.0
取消

发行版

暂无发行版

AvenirMQ

贡献者

全部

近期动态

加载更多
不能加载更多了
NodeJS
1
https://gitee.com/onlyyyy/AvenirMQ.git
git@gitee.com:onlyyyy/AvenirMQ.git
onlyyyy
AvenirMQ
AvenirMQ
master

搜索帮助

182229 41614e54 1850385 182230 7885ed45 1850385