1 Star 0 Fork 46

Git_OSC / redis-replicator

forked from leonchen83 / redis-replicator 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

内容索引(Table of Contents)

1. Redis-replicator

1.1. 简介

Build Status Coverage Status Maven Central Javadocs Hex.pm LICENSE

Redis Replicator是一款RDB解析以及AOF解析的工具. 此工具完整实现了Redis Replication协议. 支持SYNC, PSYNC, PSYNC2等三种同步命令. 还支持远程RDB文件备份以及数据同步等功能. 此文中提到的 命令 特指Redis中的写(比如 set,hmset)命令,不包括读命令(比如 get,hmget), 支持的redis版本范围从2.6到6.0

1.2. QQ讨论组

479688557

1.3. 联系作者

chen.bao.yi@qq.com

1.4. 兼容性声明

com.moilioncircle.redis.replicator.cmd.impl包下的文件;
由于要兼容redis变化,可能API会根据不同版本有不兼容的调整.

2. 安装

2.1. 安装前置条件

jdk 1.8+
maven-3.3.1+(支持 toolchains)
redis 2.6 - 6.0

2.2. Maven依赖

    <dependency>
        <groupId>com.moilioncircle</groupId>
        <artifactId>redis-replicator</artifactId>
        <version>3.4.1</version>
    </dependency>

2.3. 安装源码到本地maven仓库

    step 1: 安装 jdk-1.8.x
    step 2: 安装 jdk-9.0.x(或jdk-11.0.x)
    step 3: git clone https://github.com/leonchen83/redis-replicator.git
    step 4: cd ./redis-replicator 
            替换toolchains.xml中相应的jdk路径并保存
    step 5: $mvn clean install package -Dmaven.test.skip=true --global-toolchains ./toolchains.xml

2.4. 选择一个版本

redis 版本 redis-replicator 版本
[2.6, 6.0.x] [3.4.0, ]
[2.6, 5.0.x] [2.6.1, 3.3.3]
[2.6, 4.0.x] [2.3.0, 2.5.0]
[2.6, 4.0-RC3] [2.1.0, 2.2.0]
[2.6, 3.2.x] [1.0.18](不再提供支持)

3. 简要用法

3.1. 用法

        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueString) {
                    KeyStringValueString kv = (KeyStringValueString) event;
                    System.out.println(new String(kv.getKey()));
                    System.out.println(new String(kv.getValue()));
                } else {
                    ....
                }
            }
        });
        replicator.open();

3.2. 备份远程redis的rdb文件

参阅 RdbBackupExample.java

3.3. 备份远程redis的实时命令

参阅 CommandBackupExample.java

3.4. 将rdb转换成dump格式

我们可以用 DumpRdbVisitor 来将 rdb 转换成 redis DUMP 格式。


        Replicator r = new RedisReplicator("redis:///path/to/dump.rdb");
        r.setRdbVisitor(new DumpRdbVisitor(r));
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (!(event instanceof DumpKeyValuePair)) return;
                DumpKeyValuePair dkv = (DumpKeyValuePair) event;
                byte[] serialized = dkv.getValue();
                // we can use redis RESTORE command to migrate this serialized value to another redis.
            }
        });
        r.open();

3.5. 检查Rdb的正确性

我们可以用 SkipRdbVisitor 来检查 rdb 的正确性.


        Replicator r = new RedisReplicator("redis:///path/to/dump.rdb");
        r.setRdbVisitor(new SkipRdbVisitor(r));
        r.open();

3.6. 其他示例

参阅 examples

4. 高级主题

4.1. 命令扩展

4.1.1. 首先写一个command类

    public static class YourAppendCommand extends AbstractCommand {
        private final String key;
        private final String value;
    
        public YourAppendCommand(String key, String value) {
            this.key = key;
            this.value = value;
        }
                
        public String getKey() {
            return key;
        }
        
        public String getValue() {
            return value;
        }
    }

4.1.2. 然后写一个command parser


    public class YourAppendParser implements CommandParser<YourAppendCommand> {

        @Override
        public YourAppendCommand parse(Object[] command) {
            return new YourAppendCommand(new String((byte[]) command[1], UTF_8), new String((byte[]) command[2], UTF_8));
        }
    }

4.1.3. 注册这个command parser到replicator

    Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
    replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser());

4.1.4. 处理这个注册的command事件

    replicator.addEventListener(new EventListener() {
        @Override
        public void onEvent(Replicator replicator, Event event) {
            if(event instanceof YourAppendCommand){
                YourAppendCommand appendCommand = (YourAppendCommand)event;
                // your code goes here
            }
        }
    });

4.1.5. 结合到一起

参阅 CommandExtensionExample.java

4.2. Module扩展(redis-4.0及以上)

4.2.1. 编译redis源码中的测试modules

    $cd /path/to/redis-4.0-rc2/src/modules
    $make

4.2.2. 打开redis配置文件redis.conf中相关注释

    loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so

4.2.3. 写一个module parser

    public class HelloTypeModuleParser implements ModuleParser<HelloTypeModule> {

        @Override
        public HelloTypeModule parse(RedisInputStream in, int version) throws IOException {
            DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
            int elements = parser.loadUnsigned(version).intValue();
            long[] ary = new long[elements];
            int i = 0;
            while (elements-- > 0) {
                ary[i++] = parser.loadSigned(version);
            }
            return new HelloTypeModule(ary);
        }
    }

    public class HelloTypeModule implements Module {
        private final long[] value;

        public HelloTypeModule(long[] value) {
            this.value = value;
        }

        public long[] getValue() {
            return value;
        }
    }

4.2.4. 再写一个command parser

    public class HelloTypeParser implements CommandParser<HelloTypeCommand> {
        @Override
        public HelloTypeCommand parse(Object[] command) {
            String key = new String((byte[])command[1],Constants.UTF_8);
            long value = Long.parseLong(new String((byte[])command[2],Constants.UTF_8));
            return new HelloTypeCommand(key, value);
        }
    }

    public class HelloTypeCommand extends AbstractCommand {
        private final String key;
        private final long value;

        public long getValue() {
            return value;
        }

        public String getKey() {
            return key;
        }

        public HelloTypeCommand(String key, long value) {
            this.key = key;
            this.value = value;
        }
    }

4.2.5. 注册module parser和command parser并处理相关事件

    public static void main(String[] args) throws IOException {
        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
        replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueModule) {
                    System.out.println(event);
                }
                
                if (event instanceof HelloTypeCommand) {
                    System.out.println(event);
                }
            }
        });
        replicator.open();
    }

4.2.6. 结合到一起

参阅 ModuleExtensionExample.java

4.3. Stream

Redis-5.0+ 增加了一个新的数据结构 STREAM. Redis-replicator 用下述代码解析 STREAM


        Replicator r = new RedisReplicator("redis://127.0.0.1:6379");
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueStream) {
                    KeyStringValueStream kv = (KeyStringValueStream)event;
                    // key
                    String key = kv.getKey();
                    
                    // stream
                    Stream stream = kv.getValueAsStream();
                    // last stream id
                    stream.getLastId();
                    
                    // entries
                    NavigableMap<Stream.ID, Stream.Entry> entries = stream.getEntries();
                    
                    // optional : group
                    for (Stream.Group group : stream.getGroups()) {
                        // group PEL(pending entries list)
                        NavigableMap<Stream.ID, Stream.Nack> gpel = group.getPendingEntries();
                        
                        // consumer
                        for (Stream.Consumer consumer : group.getConsumers()) {
                            // consumer PEL(pending entries list)
                            NavigableMap<Stream.ID, Stream.Nack> cpel = consumer.getPendingEntries();
                        }
                    }
                }
            }
        });
        r.open();

4.4. 编写你自己的rdb解析器

  • 写一个类继承 RdbVisitor 抽象类
  • 通过 ReplicatorsetRdbVisitor 方法注册你自己的 RdbVisitor.

4.5. Redis URI

在 redis-replicator-2.4.0 版之前, 我们按如下方式构造 RedisReplicator :

Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/dump.rdb", FileType.RDB, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.AOF, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.MIXED, Configuration.defaultSetting());

在 redis-replicator-2.4.0 版之后, 我们引入了一个新的概念(Redis URI) 来简化 RedisReplicator 的构造, 以便提供一致的API.

Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");

// 配置的例子
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379?authPassword=foobared&readTimeout=10000&ssl=yes");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb?rateLimit=1000000");
Replicator replicator = new RedisReplicator("rediss://user:pass@127.0.0.1:6379?rateLimit=1000000");

5. 其他主题

5.1. 内置的Command Parser

命令 命令 命令 命令 命令 命令
PING APPEND SET SETEX MSET DEL
SADD HMSET HSET LSET EXPIRE EXPIREAT
GETSET HSETNX MSETNX PSETEX SETNX SETRANGE
HDEL UNLINK SREM LPOP LPUSH LPUSHX
LRem RPOP RPUSH RPUSHX ZREM ZINTERSTORE
INCR DECR INCRBY PERSIST SELECT FLUSHALL
FLUSHDB HINCRBY ZINCRBY MOVE SMOVE BRPOPLPUSH
PFCOUNT PFMERGE SDIFFSTORE RENAMENX PEXPIREAT SINTERSTORE
ZADD BITFIELD SUNIONSTORE RESTORE LINSERT ZREMRANGEBYLEX
GEOADD PEXPIRE ZUNIONSTORE EVAL SCRIPT ZREMRANGEBYRANK
PUBLISH BITOP SETBIT SWAPDB PFADD ZREMRANGEBYSCORE
RENAME MULTI EXEC LTRIM RPOPLPUSH SORT
EVALSHA ZPOPMAX ZPOPMIN XACK XADD XCLAIM
XDEL XGROUP XTRIM XSETID

5.2. 当出现EOFException

  • 调整redis server中的以下配置. 相关配置请参考 redis.conf
    client-output-buffer-limit slave 0 0 0

警告: 这个配置可能会使redis-server中的内存溢出

5.3. 跟踪事件日志log

  • 日志级别调整成 debug
  • 如果你项目中使用log4j2,请加入如下Logger到配置文件:
    <Logger name="com.moilioncircle" level="debug">
        <AppenderRef ref="YourAppender"/>
    </Logger>
    Configuration.defaultSetting().setVerbose(true);
    // redis uri
    "redis://127.0.0.1?verbose=yes"

5.4. SSL安全链接

    System.setProperty("javax.net.ssl.keyStore", "/path/to/keystore");
    System.setProperty("javax.net.ssl.keyStorePassword", "password");
    System.setProperty("javax.net.ssl.keyStoreType", "your_type");

    System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
    System.setProperty("javax.net.ssl.trustStorePassword", "password");
    System.setProperty("javax.net.ssl.trustStoreType", "your_type");

    Configuration.defaultSetting().setSsl(true);

    // 可选设置
    Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
    Configuration.defaultSetting().setSslParameters(sslParameters);
    Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier);
    // redis uri
    "redis://127.0.0.1:6379?ssl=yes"
    "rediss://127.0.0.1:6379"

5.5. redis认证

    Configuration.defaultSetting().setAuthUser("default");
    Configuration.defaultSetting().setAuthPassword("foobared");
    // redis uri
    "redis://127.0.0.1:6379?authPassword=foobared&authUser=default"
    "redis://default:foobared@127.0.0.1:6379"

5.6. 避免全量同步

  • 调整redis-server中的如下配置
    repl-backlog-size
    repl-backlog-ttl
    repl-ping-slave-period

repl-ping-slave-period 必须 小于 Configuration.getReadTimeout(), 默认的 Configuration.getReadTimeout() 是30秒.

5.7. 生命周期事件

        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        final long start = System.currentTimeMillis();
        final AtomicInteger acc = new AtomicInteger(0);
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if(event instanceof PreRdbSyncEvent) {
                    System.out.println("pre rdb sync");
                } else if(event instanceof PostRdbSyncEvent) {
                    long end = System.currentTimeMillis();
                    System.out.println("time elapsed:" + (end - start));
                    System.out.println("rdb event count:" + acc.get());
                } else {
                    acc.incrementAndGet();
                }
            }
        });
        replicator.open();

5.8. 处理巨大的KV

根据 4.3. 编写你自己的rdb解析器, 这个工具内嵌了一个迭代方式的rdb解析器, 以便处理巨大的KV.
详细的例子参阅:
[1] HugeKVFileExample.java
[2] HugeKVSocketExample.java

5.9. Redis6支持

5.9.1. SSL支持

    $cd /path/to/redis
    $./utils/gen-test-certs.sh
    $cd tests/tls
    $openssl pkcs12 -export -CAfile ca.crt -in redis.crt -inkey redis.key -out redis.p12
    $cd /path/to/redis
    $./src/redis-server --tls-port 6379 --port 0 --tls-cert-file ./tests/tls/redis.crt \
         --tls-key-file ./tests/tls/redis.key --tls-ca-cert-file ./tests/tls/ca.crt \
         --tls-replication yes --bind 0.0.0.0 --protected-mode no

    System.setProperty("javax.net.ssl.keyStore", "/path/to/redis/tests/tls/redis.p12");
    System.setProperty("javax.net.ssl.keyStorePassword", "password");
    System.setProperty("javax.net.ssl.keyStoreType", "pkcs12");

    System.setProperty("javax.net.ssl.trustStore", "/path/to/redis/tests/tls/redis.p12");
    System.setProperty("javax.net.ssl.trustStorePassword", "password");
    System.setProperty("javax.net.ssl.trustStoreType", "pkcs12");

    Replicator replicator = new RedisReplicator("rediss://127.0.0.1:6379");

如果你不想设置 System.setProperty 可以使用下面的方式


    RedisSslContextFactory factory = new RedisSslContextFactory();
    factory.setKeyStorePath("/path/to/redis/tests/tls/redis.p12");
    factory.setKeyStoreType("pkcs12");
    factory.setKeyStorePassword("password");

    factory.setTrustStorePath("/path/to/redis/tests/tls/redis.p12");
    factory.setTrustStoreType("pkcs12");
    factory.setTrustStorePassword("password");

    SslConfiguration ssl = SslConfiguration.defaultSetting().setSslContextFactory(factory);
    Replicator replicator = new RedisReplicator("rediss://127.0.0.1:6379", ssl);

5.9.2. ACL支持


    Replicator replicator = new RedisReplicator("redis://user:pass@127.0.0.1:6379");

6. 贡献者

7. 相关引用

8. 致谢

8.1. YourKit

YourKit
YourKit is kindly supporting this open source project with its full-featured Java Profiler.
YourKit, LLC is the creator of innovative and intelligent tools for profiling
Java and .NET applications. Take a look at YourKit's leading software products:
YourKit Java Profiler and YourKit .NET Profiler.

8.2. IntelliJ IDEA

IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software.
It is developed by JetBrains (formerly known as IntelliJ), and is available as an Apache 2 Licensed community edition,
and in a proprietary commercial edition. Both can be used for commercial development.

8.3. Redisson

Redisson is Redis based In-Memory Data Grid for Java offers distributed objects and services (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) backed by Redis server. Redisson provides more convenient and easiest way to work with Redis. Redisson objects provides a separation of concern, which allows you to keep focus on the data modeling and application logic.

Copyright (c) 2016-2019 Leon Chen "Anti 996" License Version 1.0 (Draft) Permission is hereby granted to any individual or legal entity obtaining a copy of this licensed work (including the source code, documentation and/or related items, hereinafter collectively referred to as the "licensed work"), free of charge, to deal with the licensed work for any purpose, including without limitation, the rights to use, reproduce, modify, prepare derivative works of, distribute, publish and sublicense the licensed work, subject to the following conditions: 1. The individual or the legal entity must conspicuously display, without modification, this License and the notice on each redistributed or derivative copy of the Licensed Work. 2. The individual or the legal entity must strictly comply with all applicable laws, regulations, rules and standards of the jurisdiction relating to labor and employment where the individual is physically located or where the individual was born or naturalized; or where the legal entity is registered or is operating (whichever is stricter). In case that the jurisdiction has no such laws, regulations, rules and standards or its laws, regulations, rules and standards are unenforceable, the individual or the legal entity are required to comply with Core International Labor Standards. 3. The individual or the legal entity shall not induce, suggest or force its employee(s), whether full-time or part-time, or its independent contractor(s), in any methods, to agree in oral or written form, to directly or indirectly restrict, weaken or relinquish his or her rights or remedies under such laws, regulations, rules and standards relating to labor and employment as mentioned above, no matter whether such written or oral agreements are enforceable under the laws of the said jurisdiction, nor shall such individual or the legal entity limit, in any methods, the rights of its employee(s) or independent contractor(s) from reporting or complaining to the copyright holder or relevant authorities monitoring the compliance of the license about its violation(s) of the said license. THE LICENSED WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN ANY WAY CONNECTION WITH THE LICENSED WORK OR THE USE OR OTHER DEALINGS IN THE LICENSED WORK.

简介

Redis-replicator是一款用java写的redis rdb以及命令解析软件.它可以实时解析,过滤,广播rdb以及command事件,支持redis2.6-5.0,支持psync,sync,psync2三种同步协议. 展开 收起
Java
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/Git_OSC/redis-replicator.git
git@gitee.com:Git_OSC/redis-replicator.git
Git_OSC
redis-replicator
redis-replicator
master

搜索帮助