1 Star 0 Fork 46

XAUTLOVE / zkclient

forked from 大皮卡丘 / zkclient 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

#ZKClient

这是一个ZooKeeper客户端,实现了断线重连,会话过期重连,永久监听,子节点数据变化的监听。并且加入了常用功能,例如分布式锁,Leader选举,分布式队列等。


#使用说明

##一、创建ZKClient对象 有两种方式可以方便的创建ZKClient对象。

  1. 使用构造函数创建

     String address = "localhost:2181";
     ZKClient zkClient1 = new ZKClient(address);
     ZKClient zkClient2 = new ZKClient(address,500);
     ZKClient zkClient3 = new ZKClient(address,500,1000*60);
     ZKClient zkClient4 = new ZKClient(address,500,1000*60,new BytesSerializer());
     ZKClient zkClient5 = new ZKClient(address,500,1000*60,new BytesSerializer(),Integer.MAX_VALUE);
     ZKClient zkClient6 = new ZKClient(address,500,1000*60,new BytesSerializer(),Integer.MAX_VALUE,2);
  2. 使用辅助类创建

     String address = "localhost:2181";
     ZKClient zkClient = ZKClientBuilder.newZKClient(address)
                         .sessionTimeout(1000)//可选
                         .serializer(new SerializableSerializer())//可选
                         .eventThreadPoolSize(1)//可选
                         .retryTimeout(1000*60)//可选
                         .connectionTimeout(Integer.MAX_VALUE)//可选
                         .build();创建实例

##二、节点的新增、更新、删除和获取 ###新增节点

  1. 常规新增节点

    父节点不存在会抛出异常

     zkClient.create("/test1", "123", CreateMode.EPHEMERAL);
     zkClient.create("/test1-1",123,CreateMode.EPHEMERAL_SEQUENTIAL);
     zkClient.create("/test1-2",123,CreateMode.PERSISTENT);
     zkClient.create("/test1-3",123,CreateMode.PERSISTENT_SEQUENTIAL);
  2. 递归新增节点(新增节点及其父节点)

    如果父节点不存在会被一并创建。

    对于PERSISTENT类型的节点,递归创建,父节点和子节点都创建为PERSISTENT。

    对于EPHEMERAL类型的节点,递归创建,父节点都是PERSISTENT类型,而最后一级节点才是EPHEMERAL类型。(因为EPHEMERAL不能拥有子节点)

    注意:第二个参数为节点的值,指的的最后一级节点的值。

     String path = "/test8/1/2/3";
     //递归创建节点及父节点
     zkClient.createRecursive(path, "abc", CreateMode.PERSISTENT);
     zkClient.createRecursive(path, "123", ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
  3. 特殊的EPHEMERAL类型节点

    特殊类型的EPHEMERAL节点,该节点在会话失效被删除后,重新连接会被自动创建。

     String path = "/test8/1/2/3";
     //EPHEMERAL类型节点
     zkClient.createEphemerale(path, "123", false);
     zkClient.createEphemerale(path, "123",ZooDefs.Ids.CREATOR_ALL_ACL, false);
     //EPHEMERAL_SEQUENTIAL类型
     String retPath = zkClient.createEphemerale(path, "456", true);

###更新节点数据

String path = "/test";
zkClient.setData(path, "456");
//带期望版本号的更新,如果真实的版本号与期望版本号不一致会更新失败,抛出异常
zkClient.setData(path, "123", 2);

###删除节点

  1. 常规删除

     boolean flag =  zkClient.delete("/test");//删除任意版本
     boolean flag =  zkClient.delete("/test",1);//删除指定版本
  2. 递归删除(删除节点及子节点)

     String path = "/test";
     zkClient.deleteRecursive(path);//如果/test下有多个子节点,会被一并删除

###获取节点数据

String path = "/test";
zkClient.getData(path); //如果节点不存在抛出异常
zkClient.getData(path, true); //如果节点不存在返回null
Stat stat = new Stat();
zkClient.getData(path, stat); //获得数据以及stat信息

###等待节点创建

String path = "/test";
//等待直到超时或者节点创建成功。
zkClient.waitUntilExists(path, TimeUnit.MILLISECONDS, 1000*5);

##三、监听相关

注意:对于断开连接时间过长造成的会话过期,由于服务器端在会话过期后会删除客户端设置的监听。

即便客户端在会话过期后自动连接成功,但是在会话过期到会话重建这段时间客户端监听的节点仍可能发生了改变,

而具体哪些变了或是没变,客户端是无法感知到的。

为了避免丢掉任何数据改变的事件,所有的监听器的都有一个回调方法(handleSessionExpired),用来处理会话过期这种特殊情况。

对于handleSessionExpired方法可以这样处理,以节点监听为例:

public void handleSessionExpired(String path) throws Exception {
    //在会话过期后,getData方法会阻塞直到会话重建,并且连接成功,获取数据后才会返回。
   Object data = zkClient.getData(path);
   //这里把返回的data与上次改变后的数据做对比如果改变了,则执行数据改变后的业务逻辑
   //do someting
}

###节点监听

    String path = "/test";
    ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
    //注册监听
    zkClient.listenNodeChanges(path, new ZKNodeListener() {
        @Override
        public void handleSessionExpired(String path) throws Exception {
            System.out.println("session  expired ["+path+"]");
        }
        
        @Override
        public void handleDataDeleted(String path) throws Exception {
            System.out.println("node is deleted ["+path+"]");
        }
        
        @Override
        public void handleDataCreated(String path, Object data) throws Exception {
            System.out.println("node is created ["+path+"]");
        }
        
        @Override
        public void handleDataChanged(String path, Object data) throws Exception {
            System.out.println("node is changed ["+path+"]");
        }
    });

###子节点数量监听

    String path = "/parent";
    ZKClient zkClient = ZKClientBuilder.newZKClient("localhost:2181").build();
    //注册监听
    zkClient.listenChildCountChanges(path, new ZKChildCountListener() {
        
        @Override
        public void handleSessionExpired(String path, List<String> children) throws Exception {//会话过期
            System.out.println("children:"+children);
        }
        
        @Override
        public void handleChildCountChanged(String path, List<String> children) throws Exception {//节点数量发生改变
             System.out.println("children:"+children);
        }
    });

###子节点数量和子节点数据变化监听

    String path = "/test";
    ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
    
    //注册监听
    zkClient.listenChildDataChanges(path, new ZKChildDataListener() {
        @Override
        public void handleSessionExpired(String path, Object data) throws Exception {//会话过期
           System.out.println("children:"+children);
        }
        
        @Override
        public void handleChildDataChanged(String path, Object data) throws Exception {//子节点数据发生改变
            System.out.println("the child data is changed:[path:"+path+",data:"+data+"]");
        }
        
        @Override
        public void handleChildCountChanged(String path, List<String> children) throws Exception {//子节点数量发生改变
           System.out.println("children:"+children);
        }
    });

###客户端状态监听

ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
//注册监听
zkClient.listenStateChanges(new ZKStateListener() {
        
        @Override
        public void handleStateChanged(KeeperState state) throws Exception {//客户端状态发生改变
            System.out.println("state is "+state);
        }
        
        @Override
        public void handleSessionError(Throwable error) throws Exception {//创建session出错
            //ignore
        }
        
        @Override
        public void handleNewSession() throws Exception {//会话创建
            System.out.println("new session");
        }
    });

##四、扩展功能

###分布式锁

ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
final String lockPath = "/zk/lock";
zkClient.createRecursive(lockPath, null, CreateMode.PERSISTENT);
//创建分布式锁, 非线程安全类,每个线程请创建单独实例。
ZKDistributedLock lock = ZKDistributedLock.newInstance(zkClient,lockPath);

lock.lock(); //获得锁

//do someting

lock.unlock();//释放锁

###延迟获取分布式锁 网络闪断会引起短暂的网络断开,这个时间很短,但是却给分布式锁带来很大的麻烦。

例如线程1获得了分布式锁,但是却发生网络的短暂断开,如果这期间ZooKeeper服务器删除临时节点,分布式锁就会释放,其实线程1的工作一直在进行,并没有完成也没有宕机。

显然,由于网络短暂的断开引起的锁释放一般情况下不是我们想要的。所以提供了,具有延迟功能的分布式锁。

如果线程1获得了锁,并发生网络闪断,在ZK服务器删除临时节点后,那么其他线程并不会立即尝试获取锁,而是会等待一段时间,如果再这段时间内线程1成功连接上,那么线程1将继续持有锁。

String lockPath = "/zk/delaylock";
ZKClient zkClient1 = ZKClientBuilder.newZKClient()
                        .servers("localhost:"+zkServer.getPort())
                        .sessionTimeout(1000)
                        .build();
ZKDistributedDelayLock lock = ZKDistributedDelayLock.newInstance(zkClient1, lockPach);
lock.lock(); //获得锁

//do someting

lock.unlock();//释放锁

###Leader选举 Leader选举是异步的,只需要调用selector.start()就会启动并参与Leader选举,如果成为了主服务,则会执行监听器ZKLeaderSelectorListener。

ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
final String lockPath = "/zk/leader";
final ZKLeaderSelector selector = new ZKLeaderSelector("service1", true, zkClient1, leaderPath, 
    new ZKLeaderSelectorListener() {
        
        //成为Leader后的回调函数        
        @Override
        public void takeLeadership(ZKClient client, ZKLeaderSelector selector) {
            //在这里可以编写,成为主服务后需要做的事情。
            System.out.println("I am the leader-"+selector.getLeader());
        }
    });
//启动并参与Leader选举
selector.start();

//获得当前主服务的ID
selector.getLeader();

//如果要退出Leader选举
selector.close();

###延迟Leader选举 例如线程1被选举为Leader,但是却发生网络的短暂断开,如果zooKeeper服务器删除临时节点,其他线程会认为Leader宕机,会重新选举Leader,其实线程1的工作一直在继续并没有宕机。

显然,由于网络短暂的断开引起的这种情况不是我们需要的。

延迟Leader选举类是这样解决的,如果线程1成为了Leader,并发生网络闪断,在ZK服务器删除临时节点后,那么其他线程并不会立即竞争Leader,而是会等待一段时间。

如果再这段时间内线程1成功连接上,那么线程1保持Leader的角色。

ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
String lockPath = "/zk/delayleader";
//延迟3秒选举
LeaderSelector selector = new ZKLeaderDelySelector("server1", true,3000, zkClient, leaderPath, new ZKLeaderSelectorListener() {
        
        @Override
        public void takeLeadership(ZKClient client, LeaderSelector selector) {
            msgList.add("server1 I am the leader");
           
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("server1: I am the leader-"+selector.getLeader());
            zkClient.reconnect();
        }
    });
//启动并参与Leader选举
selector.start();

//获得当前主服务的ID
selector.getLeader();

//如果要退出Leader选举
selector.close();

###分布式队列

ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
final String rootPath = "/zk/queue";
zkClient.createRecursive(rootPath, null, CreateMode.PERSISTENT);

//创建分布式队列对象
ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);

queue.offer("123");//放入元素

String value = queue.poll();//删除并获取顶部元素

String value =  queue.peek(); //获取顶部元素,不会删除

###主从服务锁

ZKClient zkClient = ZKClientBuilder.newZKClient()
                            .servers("localhost:2181")
                            .build();
final String lockPath = "/zk/halock";
zkClient.createRecursive(rootPath, null, CreateMode.PERSISTENT);

//创建锁, 非线程安全类,每个线程请创建单独实例。
ZKHALock lock = ZKHALock.newInstance(zkClient, lockPach);

lock.lock();//尝试获取锁

//获取锁成功,当前线程变为主服务。
//直到主服务宕机或与zk服务端断开连接,才会释放锁。
//此时从服务尝试获得锁,选取一个从服务变为主服务
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "{}" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright {yyyy} {name of copyright owner} Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

这是一个ZooKeeper客户端,实现了断线重连,会话过期重连,永久监听,子节点数据变化的监听。并且加入了常用功能,例如分布式锁,Leader选举,主从服务锁,分布式队列等。 展开 收起
Java
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/null_781_3827/zkclient.git
git@gitee.com:null_781_3827/zkclient.git
null_781_3827
zkclient
zkclient
master

搜索帮助

14c37bed 8189591 565d56ea 8189591