1 Star 0 Fork 0

lugela / flinkx

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
sqlservercdc.md 3.91 KB
一键复制 编辑 原始数据 按行查看 历史
dujie 提交于 2021-03-02 14:29 . add sqlserverCDC Modules and docs

SqlServer CDC Reader

一、插件名称

名称:sqlservercdcreader

二、数据源版本

SqlServer 2012及以上

三、数据源配置

SqlServer配置CDC

四、基本原理

FlinkX Sqlserver CDC实时采集基本原理

五、参数说明

  • url
    • 描述:SqlServer数据库的jdbc连接字符串,参考文档:SqlServer官方文档
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • username
    • 描述:数据源的用户名
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • password
    • 描述:数据源指定用户名的密码
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • databaseName
    • 描述:监听的数据库
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • tableList
    • 描述:需要解析的数据表,表必须已启用CDC,格式为schema.table
    • 必选:否
    • 字段类型:list
    • 默认值:无

  • cat
    • 描述:需要解析的数据更新类型,包括insert、update、delete三种
    • 注意:以英文逗号分割的格式填写。
    • 必选:是
    • 字段类型:string
    • 默认值:无

  • pollInterval
    • 描述:监听拉取SqlServer CDC数据库间隔时间
    • 注意:该值越小,采集延迟时间越小,给数据库的访问压力越大
    • 必选:否
    • 字段类型:long
    • 默认值:1000

  • lsn
    • 描述:要读取SqlServer CDC日志序列号的开始位置
    • 必选:否
    • 字段类型:string
    • 默认值:无

  • pavingData
    • 描述:是否将解析出的json数据拍平
    • 示例:假设解析的表为tb1,schema为dbo,对tb1中的id字段做update操作,id原来的值为1,更新后为2,则pavingData为true时数据格式为:
{
    "type":"update",
    "schema":"dbo",
    "table":"tb1",
    "lsn":"00000032:00002038:0005",
    "ts": 6760525407742726144,
    "before_id":1,
    "after_id":2
}

pavingData为false时:

{
    "type":"update",
    "schema":"dbo",
    "table":"tb1",
    "lsn":"00000032:00004a38:0007",
    "ts": 6760525407742726144,
    "before":{
        "id":1
    },
    "after":{
        "id":2
    }
}
  • type:变更类型,INSERT,UPDATE、DELETE
  • lsn:Sqlserver数据库变更记录对应的lsn号
  • ts:自增ID,不重复,可用于排序,解码后为FlinkX的事件时间,解码规则如下:
long id = Long.parseLong("6760525407742726144");
        long res = id >> 22;
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(sdf.format(res));		//2021-01-28 19:54:21
  • 必选:否
  • 字段类型:boolean
  • 默认值:false

六、配置示例

{
  "job" : {
    "content" : [ {
      "reader" : {
        "parameter" : {
          "username" : "uname",
          "password" : "passwd",
          "url": "jdbc:sqlserver://host:1433;database=databaseName",
          "databaseName":"databaseName",
          "tableList": ["dbo.cdc"],
          "lsn": "00000025:00000bc0:0003",
          "cat": "insert,update,delete"
        },

        "name" : "sqlservercdcreader"
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }
    ],
    "setting" : {
      "restore": {
        "isStream": true
      },
      "speed" : {
        "channel" : 1
      }
    }
  }
}
1
https://gitee.com/lugela/flinkx.git
git@gitee.com:lugela/flinkx.git
lugela
flinkx
flinkx
1.10_release

搜索帮助

53164aa7 5694891 3bd8fe86 5694891