4 Star 0 Fork 0

梦残 / 学习笔记

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
zk.txt 16.21 KB
一键复制 编辑 原始数据 按行查看 历史
Zookeeper
1. Zookeeper是一个高效的分布式协调服务,它暴露了一些公用的服务,比如命名/配置/管理/同步控制/群组服务等.我们可以使用zk来实现比如达成共识/集群管理/leader选举
Zookeeper是一个高效的分布式管理与协调框架,基于ZAB算法(原子消息广播协议)的实现,该框架能够很好地保证分布式环境中数据一致性.也是基于这样的特性,使得Zookeeper成为了解决分布式一致性问题的利器.
顺序一致性:从一个客户端发起的事务请求,最终将会严格地按照其发起的顺序被应用到zk中去
原子性:所有事务请求的处理结果在整个集群中所有的机器上的应用情况是一致的,也就是说,要么整个集群所有的机器都成功应用了某一个事务,要么都没有应用,一定不会出现部分机器应用了该事务,而另一部分没有应用的情况
单一试图:无论客户端连接的是哪一个zk服务器,其看到的数据模型都是一致的
可靠性:一旦服务器成功地应用了一个事务,并完成了对客户端的响应,那么该事务所引起的服务端状态将会被一致保留下俩,除非有另外一个事务对其更改
实时性:通常所说的实时性就是指一旦事务被成功应用,那么客户端就能立刻从服务器上获取变更以后的新数据,zk仅仅能保证在一段时间内,客户端最终一定能从服务器上读取最新的数据状态
2.zookeeper 设计目标
目标1:简单的数据结构,zk就是以简单的树形结构来进行相互协调的(也叫树形名字空间)
目标2:可以构建集群.一般zk集群通常由一组机器构成,一般3-5台机器就可以组成zk集群了,只要集群中超过半数以上的机器能够正常工作,那么整个集群就能够正常对外提供服务
目标3:顺序访问,对于来自每一个客户端的每个请求,zk都会分配一个全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序,应用程序可以使用zk的这个特性来实现更高层次的同步
目标4:高性能.由于zk将全量数据存储在内存中,并直接服务于所有的非事务请求,因此尤其是在读操作为主的场景下性能非常突出.在JMater测试下(100%读请求场景下),其结果大约在12-13w的QPS.
3.zk的结构
zk会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统.
4.zk的数据模型
1.每个子目录项如NameService都被称为znode,这个znode是被他所在的路径唯一标识,如Server1这个znode的标识为/NameService/Server1
2.znode可以有子节点目录,并且每个znode可以存储数据,注意EPHEMERAL类型的目录节点不能有子节点
3.znode是有版本的,每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以储存多分数据
4.znode可以是临时节点,一旦创建这个znode的客户端与服务器失去联系,这个znode也将自动删除,zk的客户端和服务器通信采用的是长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为session,如果znode是临时节点,这个session失效,znode也就删除
5.znode目录名可以自动编号,如果APP1存在,在创建的话就会自动变成APP2
6.znode可以被监控,包括这个目录节点中存储的数据的修改妹子节点目录的变化,一旦变化可以被通知设置监控的客户端,这个zk的核心特性,zk的很多功能都是基于这个特性实现的
5.zk的组成
leader:负责客户度的write类型的请求
follower:负责客户端的reader类型的请求,参与leader选举
observer:特殊的observer,其可以接受客户端reader请求,但不参与选举(扩容系统支撑能力,提高了读取速度.因为它不接受任何同步的写入请求,只负责与leader同步数据)
6.引用场景
zk从设计模式角度来看是一个基于观察者模式设计的分布式服务管理框架,它复制存储和管理大家关心的数据,然后接受观察者的注册,一旦这些数据的状态发生了改变,zk就将负责 通知已经在zk上注册的那些观察者做出相应的反应,从而实现集群中类似M/S管理模式
7.应用场景说明
1. 配置管理:机器的配置列表,运行时的开关配置,数据库配置信息等.这些全局配置信息通常具备以下三个特性:
1.数据量小
2.数据内容在运行时经常发生变化
3.集群中各个集群共享信息,配置一致
2. 集群管理
1.希望知道当前集群中究竟有多少机器工作
2.对集群中每天集群的运行状态进行数据收集
3.对集群中每台集群进行上下线操作
3. 发布订阅
4. 数据库切换
5. 分布式日志的收集
6. 分布式锁,队列管理等
原生api很难用,那么可以采用第三方客户端完美实现,比如Curator框架
8.配置文件zoo.cfg详解:
tickTime: 基本事件单元,以毫秒为单位。这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔, 也就是每隔 tickTime时间就会发送一个心跳。
dataDir:存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
initLimit: 这个配置项是用来配置 Zookeeper 接受客户端初始化连接时最长能忍受多少个心跳时间间隔数, 当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息, 那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒。
syncLimit: 这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度, 最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒
server.A = B:C:D : A表示这个是第几号服务器, B 是这个服务器的 ip 地址; C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口; D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader
9.java操作zk
1.创建zk
Zookeeper zk = new Zookeeper(....);
参数:connectionString: 服务器连接列表,用","分割
sessionTimeout: 心跳检测时间周期(mm)
wather:事件处理通知器
canBeReadOnly:标记当前会话是否标记为只读
sessionId和sessionPasswd:提供连接zk的id和密码,通过这两确定唯一一台客户端,目的是提供重复会话.
ps:zk客户端和服务端的连接是一个异步的过程,使用countDownLatch来完成创建
2.创建节点.zk.create(....)方法
两种方法:同步和异步
同步:
zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
参数1:节点路径(名称):/nodeName(不允许递归创建节点,也就是说父节点不存在的情况下不允许创建子节点)
参数2:节点内容:要求类型是字节数组(也就是说,不支持序列化方式,如果需要实现序列化,可使用java相关序列化框架,如HEssian,kryo)
参数3:节点权限,使用Ids.OPEND_ACL_UNSAFA开放权限即可(这个参数一般在权限没有太高要求的场景下没必要关注)
参数4:节点类型:创建节点的类型:CreateMode.*
四种:
PERSISTENT 持久化节点
PERSISTENT_SEQUENTIAL 持久顺序节点
EPHEMERAL 临时节点 (分布式锁底层的实现)
EPHEMERAL_SEQUENTIAL 临时顺序节点
异步方式:
参数5:注册一个异步回调函数,要实现AsynCallBack.StringCallBack接口,重写processResult(int rc,String path,Object ctx,String name)方法,节点创建完毕后执行此方法
rc:服务器响应码
0表示成功
-4表示端口连接
-110表示指定节点已存在
-112表示会话过期
path:接口调用时传入API的数据节点的路径参数
ctx:为调用接口传入API的ctx值
name:实际在服务器端创建节点的名称
参数6:传递给回调函数的参数,一般为上下文(Context)信息
3.删除节点
zk.delete("/testRoot/children", -1);
版本号为-1时候表示跳版本检查,其他时候表示删除对应版本
异步方式同上
4.获得节点
byte[] data = zk.getData("/testRoot", false, null);
子节点
List<String> list = zk.getChildren("/testRoot", false);
得到的路径是相对/testRoot的路径
5.判断节点是否存在
zk.exists("/testRoot/children", false);
有数据返回就是存在,不存在就是返回null
6.修改
zk.setData("/testRoot", "modify data root".getBytes(), -1);
-1表示版本
10.Watcher,zk状态,事件类型
zk有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher.
Watcher是监听数据发生某些变化,那么就一定会有对应的事件类型,和状态类型
事件类型(znode节点相关的)
EventType.NodeCreated
EventType.NodeDataChanged
EventType.NodeChildrenChanged
EventType.NodeDeleted
事件类型:
KeeperState.Disconnected
KeeperState.SyncConnected
KeeperState.AuthFailed
KeeperState.Expired
继承watcher 重写process方法
// 连接状态
KeeperState keeperState = event.getState();
// 事件类型
EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();
11.ACL(AUTH)
12.zkClient(dubbo)
是由Datameer的工程师开发.在源生api接口的基础上进行了封装,简化了zk的复杂性.
1.创建客户端方法: //增删改查方法和watch没有半毛线关系
zkClient(...);
参数1:zkServer zookeeper服务器的地址,用","隔开.
参数2:sessionTimeout
参数3:connectionTimeout
参数4:IZkConnection接口的实现类
参数5:zkSerializer自动以序列化接口
解决了递归创建
增:
zkc.create(key,value,boolean);
zkc.createEphemeral("/temp"); //创建临时节点
zkc.createPersistent("/super/c1", true); //递归节点
递归创建的时候只能创建节点不能创建value
kc.createPersistent("/super/c1", "nihao");//会抛没有super节点的异常
zkc.delete("/temp");
zkc.deleteRecursive("/super"); //递归删除
查:
String data = zkc.readData(rp);
List<String> list = zkc.getChildren("/super");
改:
zkc.writeData("/super/c1", "新内容");
是否存在:
Boolean false = zkc.exists("/super/c1");
2.watcher
zk提供了一套监听方式,我们可以使用监听节点的方式进行操作,剔除了繁琐的反复wather操作,简化了代码的复杂程度.
watcher的特性:
一次性:
对于zk的watcher只需要记住,zk有watch事件,是一次性触发,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher,由于zk的监控都是一次性的所以必须每次设置监控.
客户端串行执行:
客户端的watcher回调的过程是一个串行同步的过程,这是为了保证顺序,同时需要注意,千万不要因为一个watcher的处理逻辑影响了整个客户端的watcher回调
轻量:
watcherEvent是zk整个watcher通知机制的最小通知单元,整个结构只包含三个部分:通知状态,事件类型,节点路径,也就是说只会通知客户端发生了事件而不告诉其内容,需要客户端自己去获取.
1.zkc.subscribeChildChanges方法
参数1:path路径
参数2:实现了IZkChildListener接口.只需要重写handleChildChanges(String parantePath,List<string> currentChilds)方法.
parantePath为所监听节点全路径
currentChilds为最新节点列表(相对路径)
针对下面三个事件触发:
新增子节点
减少子节点
删除节点
2.zkc.subscribeDataChanges("/super", new IZkDataListener() ).
//对父节点添加监听子节点变化。
13.Curator框架(dubbox)
提供了丰富的api:
session超时重连,主从选举,分布式计数器,分布式锁等等适用于各种复杂zk场景的api封装
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
1.使用:
Curator框架中使用链式编程风格,易读性更强,使用工程方法创建连接对象.
1.使用CuratorFrameworkFacatory的两个静态工厂方法(参数不同)来实现
参数1:connectString,连接串
参数2:retryPolicy.重试连接策略.四种
ExponentialBackoffRetry,RetryNtimes,RetryOneTimes,RetryUntilElapsed
参数3:sessionTimeoutMs.默认60 000ms
参数4:connectionTimeoutMs,默认15000ms
retryPolicy策略通过一个接口让用户自定义实现
//1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
// .namespace("super")
.build();
//3 开启连接
cf.start();
增删改查:
1.create,可选链式项
creatingParentIfNeeded,withNode,forPath,wichACL等
2.delete
deletingChildrenIfNeeded,guranteed,withVersion,forPath
3.getData().forPaht(),setData.forPath()
4.异步绑定回调方法,比如创建节点的时候绑定一个回调函数,该函数可以输出服务器的状态码以及服务器事件类型.还可以加入一个线程池优化操作.
5.读取该节点方法getChildren().forPath();
6.checkExists().forPath();
2.Curator监听器
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
有了个这个依赖包,我们使用NodeCache的方式去客户端示例中注册一个监听缓存,然后实现对应的监听方法即可.这里我们有两种监听方式:
NodeCacheListener:监听节点的新增,修改操作
PathChildrenCacheListener:监听子节点的新增修改删除操作
// 建立一个cache缓存
final NodeCache cache = new NodeCache(cf, "/super", false); //false表示不进行压缩
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {};
// 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
// 在初始化的时候就进行缓存监听
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {}
3.分布式锁
final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
lock.acquire();
4.分布式计数器
DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(cf, "/super", new RetryNTimes(3, 1000)); //重试间隔
AtomicValue<Integer> value = atomicIntger.add(1);
System.out.println(value.succeeded());
System.out.println(value.postValue()); //最新值
System.out.println(value.preValue()); //原始值
5.DistributedDoubleBarrier
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5); //5个终端
可以模拟5个客户端都准备好以后同时去执行一件事情,然后同时结束
barrier.enter();
barrier.leave();
6.DistributedBarrier
//同时开始
barrier = new DistributedBarrier(cf, "/super");
barrier.setBarrier(); //设置
barrier.waitOnBarrier(); //等待
....要执行的内容
barrier.removeBarrier(); //释放
Java
1
https://gitee.com/mengcan/study-note.git
git@gitee.com:mengcan/study-note.git
mengcan
study-note
学习笔记
master

搜索帮助