1 Star 5 Fork 1

ken / xconsumer

加入 Gitee
与超过 600 万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README.md

xconsumer

基于zookeeper与go实现的分布式系统

基础功能

  • 基于zk的集群
  • leader选举
  • 任务分发
  • 事件监听
  • http管理

开发版本

  • zookeeper 3.7.0
  • go 1.16

如何使用

基于zookeeper进行leader选举,需要安装zookeeper,推荐使用docker进行安装。

docker run --name zookeeper-name -p 2181:2181 -it -d zookeeper

需要其他特性请查看docker hub官方文档

生成运行文件
go build -o bin main.go

执行
cd bin
./main.go

如果你想测试多个客户端的选举情况和worker任务分配情况,只需要执行多个bin下面的编译文件即可。

通过docker使用

dockerfile方式

自行安装好zookeeper,修改app.toml中的zookeeper的host

构建镜像
docker build -t xconsumer:0.1 .

运行
docker run -it --name xconsumer --rm xconsumer:0.1

docker-compose

修改app.toml中的zookeeper的host为"xconsumer-zookeeper"

构建容器
docker-compose up

功能开发

默认系统添加了两个测试任务,test01,test02,代码位于work目录下

我们以添加一个test01任务来举例

  1. 修改app.toml配置文件,新增一个[[task]]
name = "test01" # 任务名称,我们就叫做test01
workName = "test01" # 任务执行的work,程序会回调此方法
workerNum = 2 # 任务数量,这里需要启动2个任务处理
  1. 在work目录下新建文件夹test01,新建main
type Test01 struct {
}

// Run 非阻塞调用
// 程序执行完以后,又会重新调起
// 程序每隔10秒钟打印一句话
func (t *Test01) Run() {
	log.Info("i am test 01,10s一次")
	time.Sleep(time.Second * 10)
}

这一步不是必须的,只是为了让各个work更加分离开,建议这样做

  1. 新建一个test01.go文件

方法名命名规范为Run+work名字,work名字为配置中的workName

方法必须是Work中的一个方法集,因为程序实现使用了反射

// 必须是Work的方法
func (w *Work) RunTest01() {
	t01 := test01.Test01{}
	t01.Run()
}

到此为止,我们的一个任务就编写完了,可以重启项目查看运行效果了

功能架构

图片放在github,加载可能会较慢,请耐心等待

整体xconsumer项目代码量非常少,大致流程分为以下几步:

  1. 集群初始化(检查)
  2. leader选举
  3. leader集群任务分配
  4. 分发任务
  5. 开始工作

功能架构

功能流程

功能流程

注:只有leader节点操作,并且leader节点和其他follower节点一样,需要进行任务工作处理。

节点通信

节点连接上zk后,会根据集群名字直接加入指定集群,节点和节点直接通过zk事件监听通信,不直接进行通信操作

节点选举

zookeeper的节点是原子增加的,所以不会存在重复,相当于有一个原子锁。

xconsumer的选举算法就特别简单,选取最小值为leader,即先进先出原则。

如果leader掉线,会触发zk监听,其他node节点服务收到通知后重新判断自己是不是leader。

节点选举算法

worker计算

worker计算算法

假设

  • task1任务,需要3个worker
  • task2任务,需要2个worker
  • task3任务,需要1个worker

worker的任务分配,采用的是节点轮询任务分配算法,这样比较好的保证每台服务器都能正常的分发到任务,不会造成任务拥挤或者任务缺失。

存在问题

但还是存在一个问题是"头部问题",因为leader节点是按照最小原子选取的,而轮询也是按照最小开始,这样leader节点服务器的工作任务就更重。

有两个可优化点

  1. leader不参与worker工作
  2. 任务计算从follower开始

动态work分配

任务分配是动态计算的,当加入一个节点,或者关闭一个节点,任务由leader动态分配

动态work分配

注意事项

1. node节点变动任务变化问题

因为实现采用的是go协程,内部采用通道来进行的work开启关闭,如果出现阻塞程序,此通知需要等待阻塞完成才能进行通知操作。

最新的版本V2.1.0已经将关闭控制交由work来实现,每个work需要根据阻塞情况实现work关闭。

V2.1.0还增加了对一次性任务的实现支持

2. 延时反应问题

程序采用的是leader任务分发,所以当有节点加入的时候,不会瞬时反应,需要等待leader收到节点变动通知,重新分配任务,然后通知所有follower节点。

3. leader服务问题

选举算法采用的最小节点法,所以最好保证每台服务器配置都一致,不然容易造成work任务分配不均匀问题。

4. 脑裂问题

网络波动

leader的zk连接有默认20s掉线检查时间,在20s内是不会认为离线的,如果在20s内重新连接上服务器,依然还是leader。(此请求仅仅只针对网络波动)

如果超过20s重新链接的情况未做测试

服务器宕机

如果是直接leader服务器宕机,然后重启,那么自然就会成为follower节点。

5. 生产使用

此项目编写时间短暂,功能不够完善,测试覆盖面不足,请不要直接用于生产环境使用。

todo

  • worker稳定性测试
  • 增加web管理界面
  • 日志代码优化

其他

开发过程中,为了方便本地zookeeper管理,推荐使用 PrettyZoo 客户端工具

贡献代码

欢迎大家fork和提pull requests。

意见&建议

如果你有任何的想法或是建议那就给我留个言吧!

仓库评论 ( 0 )

你可以在登录后,发表评论

简介

基于zookeeper与go实现的分布式消费系统 展开 收起
Go 等 2 种语言
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Go
1
https://gitee.com/ken.yang/xconsumer.git
git@gitee.com:ken.yang/xconsumer.git
ken.yang
xconsumer
xconsumer
master

搜索帮助