随着 Oracle、SAP、英特尔等科技公司宣布**企业不会继续在俄罗斯开展正常业务**,一定程度上也给了我们一个警示,提高我们整体科技水平至关重要,从芯片上就是一个很好的例子。**国产软件替代至关重要,大国发展要懂得居安思危**!达梦国产数据库经过40年的发展,已经成功应用于中国国内金融、能源、航空、通信等数十个领域。因此要基于Flink计算引擎,面向达梦数据库做实时数据的开发就是一个非常有价值的事情,也是一个迫切的需求。
经过本人近一年不懈的钻研与调测,终于在 **flink-connector-jdbc(3.1.0和3.1.1版本)**中开发出了支持写入DM8数据库的功能,本文将说明扩展后的flink-connector-jdbc如何高效写入国产数据库达梦(V8)。以下是达到的效果:
create database watermark_db;
use watermark_db;
-- 采集MySQL
DROP TABLE IF EXISTS table_process_cdc;
CREATE TABLE IF NOT EXISTS table_process_cdc (
id bigint COMMENT '自增主键id'
, `name` string COMMENT '名称'
, age bigint COMMENT '年龄'
, gender bigint COMMENT '性别,0-男生,1-女生'
, `address` string COMMENT '住址'
, create_time timestamp(3) COMMENT '创建时间'
, update_time timestamp(3) COMMENT '更新时间'
-- 声明 update_time 是事件时间属性,并且用延迟5秒的策略来生成watermark
,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
,PRIMARY KEY(id) NOT ENFORCED
) COMMENT '配置表' WITH (
'connector' = 'mysql-cdc'
,'hostname' = 'hadoop105'
,'port' = '3306'
,'username' = 'root'
,'password' = 'root'
,'server-time-zone' = 'UTC+8'
,'scan.incremental.snapshot.enabled' = 'true'
,'scan.startup.mode' = 'earliest-offset'
,'database-name' = 'testdb'
,'table-name' = 'table_process_o'
);
-- kafka_dm映射表
DROP TABLE IF EXISTS ods_table_process_dm;
CREATE TABLE IF NOT EXISTS ods_table_process_dm (
id bigint COMMENT '自增主键id'
, `name` string COMMENT '名称'
, age bigint COMMENT '年龄'
, gender bigint COMMENT '性别,0-男生,1-女生'
, `address` string COMMENT '住址'
, create_time timestamp(3) COMMENT '创建时间'
, update_time timestamp(3) COMMENT '更新时间'
, kafka_time timestamp(3) COMMENT '进入Kafka的时间'
-- 声明 update_time 是事件时间属性,并且用延迟5秒的策略来生成watermark
,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
,PRIMARY KEY(id) NOT ENFORCED
) COMMENT '配置表' WITH (
'connector' = 'kafka'
,'topic' = 'ods_table_process_dm'
,'properties.bootstrap.servers' = 'hadoop105:9092'
,'properties.group.id' = 'my_group_id'
,'value.format' = 'debezium-json'
,'scan.startup.mode' = 'earliest-offset'
,'value.debezium-json.ignore-parse-errors' = 'true'
,'value.debezium-json.timestamp-format.standard' = 'ISO-8601'
);
-- 写入JDBC(DM8数据库)
DROP TABLE IF EXISTS table_process_sink;
CREATE TABLE IF NOT EXISTS table_process_sink (
ID bigint
, NAME string
, AGE bigint
, GENDER bigint
, `ADDRESS` string
, CREATE_TIME timestamp(3)
, UPDATE_TIME timestamp(3)
, KAFKA_TIME timestamp(3) COMMENT '进入kafka的时间'
, ETL_TIME timestamp(3) COMMENT '进入jdbc的时间'
-- 声明 UPDATE_TIME 是事件时间,并且用 延迟5秒 的策略来生成 watermark
,WATERMARK FOR UPDATE_TIME AS UPDATE_TIME - INTERVAL '5' SECOND
,PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:dm://192.168.137.103:5236',
'username' = 'DMHS',
'password' = 'Dmhs_1234',
'table-name' = 'TABLE_PROCESS',
'driver' = 'dm.jdbc.driver.DmDriver'
);
注意:
演示部分为FlinkCDC实时读取mysql数据库的binlog,写入kafka的主题,然后FlinkSQL基于flink-connector-jdbc 组件消费Kafka主题数据,写入达梦V8数据库,并支持实时同步数据库增,删,改操作。
set 'pipeline.name' = 'table_process_sink_达梦';
SET 'table.local-time-zone' = 'Asia/Shanghai';
set 'parallelism.default' = '1';
SET 'pipeline.operator-chaining' = 'false';
set 'execution.runtime-mode' = 'streaming';
SET 'table.exec.source.idle-timeout' = '10s';
SET 'execution.checkpointing.interval' = '5min';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
set 'execution.checkpointing.tolerable-failed-checkpoints' = '2';
set 'restart-strategy' = 'fixed-delay';
set 'restart-strategy.fixed-delay.attempts' = '4';
set 'restart-strategy.fixed-delay.delay' = '30s';
set 'execution.checkpointing.unaligned' = 'true';
SET 'execution.savepoint.ignore-unclaimed-state' = 'true';
SET 'table.exec.source.cdc-events-duplicate' = 'true';
-- 采集MySQL数据写入Kafka
INSERT INTO ods_table_process_dm
SELECT
id,
`name`,
age,
gender,
`address`,
create_time,
update_time,
PROCTIME() as kafka_time
FROM table_process_cdc;
-- 消费kafka数据写入达梦数据库
INSERT INTO table_process_sink
SELECT
id,
`name`,
age,
gender,
`address`,
create_time,
update_time,
kafka_time,
PROCTIME() as etl_time
FROM ods_table_process_dm;
source/sink 是 flink最核心的部分之一,通过对其实现原理的学习,再结合源码分析,有助于加深对框架处理过程的理解,以及架构设计上的提升。
如果我们对自己对接一个数据源,核心的话就是连接器 connector,比如关系型数据库就是 JDBC。
flink-connector-jdbc 是 Apache Flink 框架提供的一个用于与关系型数据库进行连接和交互的连接器。它提供了使用 Flink 进行批处理和流处理的功能,可以方便地将关系型数据库中的数据引入 Flink 进行分析和处理,或者将 Flink 计算结果写入关系型数据库。
flink官方connector的架构如下
MetaData 将 sql create source table 转化为实际的 CatalogTable,对应代码 RelNode;
Planning
创建 RelNode 的过程中使用 SPI 将所有的 source(DynamicTableSourceFactory)\sink(DynamicTableSinkFactory) 工厂动态加载,获取到 connector = kafka,然后从所有 source 工厂中过滤出名称为 kafka 并且 继承自 DynamicTableSourceFactory.class 的工厂类 KafkaDynamicTableFactory,使用 KafkaDynamicTableFactory 创建出 KafkaDynamicSource;
Runtime
KafkaDynamicSource 创建出 FlinkKafkaConsumer,负责flink程序实际运行。
flink-connector-jdbc可以实现以下核心功能:
flink-connector-jdbc为Flink提供了与关系型数据库集成的能力,可以方便地进行数据的导入、导出和处理,为开发人员提供了更强大和灵活的数据处理能力。
以下是 flink-connector-jdbc 源码组成:红色框中的代码就是我开发的哦
首先添加达梦数据库的pom.xml依赖
<properties>
<dm.version>8.1.2.192</dm.version>
</properties>
<dependencies>
<!-- DM8 -->
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<version>${dm.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
经官网推荐和Maven仓库克洗
首先根据SPI机制添加支持DM数据库的工厂类
# resource/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
org.apache.flink.connector.jdbc.databases.dm.dialect.DmDialectFactory
创建工程类DmDialectFactory
// 在database下面创建dm,再创建 /dm/dialect
// 路径如下:flink-connector-jdbc-3.1.1/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/dm/dialect/DmDialectFactory.java
package org.apache.flink.connector.jdbc.databases.dm.dialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
/** Factory for {@link DmDialect}. */
public class DmDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:dm:");
}
@Override
public JdbcDialect create() {
return new DmDialect();
}
}
JdbcDialectFactory是一个工厂类,用于创建特定数据库的JdbcDialect实例。它的主要作用是根据用户提供的JDBC连接URL,确定要连接的数据库类型,并创建对应的JdbcDialect实例。通过JdbcDialect实例,flink-connector-jdbc可以为特定类型的数据库提供更高级的功能和最佳性能。例如,JdbcDialect 可以优化生成的SQL查询,使用特定的语法和函数。它还可以检测数据库支持的特性,以避免不支持的操作。
创建方言类DmDialect
package org.apache.flink.connector.jdbc.databases.dm.dialect;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/** JDBC dialect for Dm8. */
@Internal
public class DmDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
private static final int MAX_TIMESTAMP_PRECISION = 9;
private static final int MIN_TIMESTAMP_PRECISION = 1;
private static final int MAX_DECIMAL_PRECISION = 38;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new DmRowConverter(rowType);
}
@Override
public String getLimitClause(long limit) {
return "FETCH FIRST " + limit + " ROWS ONLY";
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("dm.jdbc.driver.DmDriver");
}
@Override
public String dialectName() {
return "Dm";
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String sourceFields =
Arrays.stream(fieldNames)
.map(f -> ":" + f + " " + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String onClause =
Arrays.stream(uniqueKeyFields)
.map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
.collect(Collectors.joining(" and "));
final Set<String> uniqueKeyFieldsSet =
Arrays.stream(uniqueKeyFields).collect(Collectors.toSet());
String updateClause =
Arrays.stream(fieldNames)
.filter(f -> !uniqueKeyFieldsSet.contains(f))
.map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
String insertFields =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String valuesClause =
Arrays.stream(fieldNames)
.map(f -> "s." + quoteIdentifier(f))
.collect(Collectors.joining(", "));
// if we can't divide schema and table-name is risky to call quoteIdentifier(tableName)
// for example [tbo].[sometable] is ok but [tbo.sometable] is not
String mergeQuery =
" MERGE INTO "
+ tableName
+ " t "
+ " USING (SELECT "
+ sourceFields
+ " FROM DUAL) s "
+ " ON ("
+ onClause
+ ") "
+ " WHEN MATCHED THEN UPDATE SET "
+ updateClause
+ " WHEN NOT MATCHED THEN INSERT ("
+ insertFields
+ ")"
+ " VALUES ("
+ valuesClause
+ ")";
return Optional.of(mergeQuery);
}
@Override
public Optional<Range> decimalPrecisionRange() {
return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
}
@Override
public Optional<Range> timestampPrecisionRange() {
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
}
@Override
public Set<LogicalTypeRoot> supportedTypes() {
return EnumSet.of(
LogicalTypeRoot.CHAR,
LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.BOOLEAN,
LogicalTypeRoot.VARBINARY,
LogicalTypeRoot.DECIMAL,
LogicalTypeRoot.TINYINT,
LogicalTypeRoot.SMALLINT,
LogicalTypeRoot.INTEGER,
LogicalTypeRoot.BIGINT,
LogicalTypeRoot.FLOAT,
LogicalTypeRoot.DOUBLE,
LogicalTypeRoot.DATE,
LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.ARRAY);
}
}
JdbcDialect 是一个接口,用于定义与特定数据库相关的SQL语法和行为。每种不同类型的数据库可能有一些特定的SQL方言和行为,JdbcDialect提供了一种方式来处理这些差异,以确保在不同类型的数据库上执行的SQL操作正确执行,并且能够提供最佳的性能。
创建数据转换类DmRowConverter
/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Dm8.
*/
@Internal
public class DmRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
public DmRowConverter(RowType rowType) {
super(rowType);
}
@Override
public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case BOOLEAN:
return val -> val instanceof NUMBER ? ((NUMBER) val).booleanValue() : val;
case FLOAT:
return val ->
val instanceof NUMBER
? ((NUMBER) val).floatValue()
: val instanceof BINARY_FLOAT
? ((BINARY_FLOAT) val).floatValue()
: val instanceof BigDecimal
? ((BigDecimal) val).floatValue()
: val;
case DOUBLE:
return val ->
val instanceof NUMBER
? ((NUMBER) val).doubleValue()
: val instanceof BINARY_DOUBLE
? ((BINARY_DOUBLE) val).doubleValue()
: val instanceof BigDecimal
? ((BigDecimal) val).doubleValue()
: val;
case TINYINT:
return val ->
val instanceof NUMBER
? ((NUMBER) val).byteValue()
: val instanceof BigDecimal ? ((BigDecimal) val).byteValue() : val;
case SMALLINT:
return val ->
val instanceof NUMBER
? ((NUMBER) val).shortValue()
: val instanceof BigDecimal ? ((BigDecimal) val).shortValue() : val;
case INTEGER:
return val ->
val instanceof NUMBER
? ((NUMBER) val).intValue()
: val instanceof BigDecimal ? ((BigDecimal) val).intValue() : val;
case BIGINT:
return val ->
val instanceof NUMBER
? ((NUMBER) val).longValue()
: val instanceof BigDecimal ? ((BigDecimal) val).longValue() : val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case CHAR:
case VARCHAR:
return val ->
(val instanceof CHAR)
? StringData.fromString(((CHAR) val).getString())
: (val instanceof OracleClob)
? StringData.fromString(((OracleClob) val).stringValue())
: StringData.fromString((String) val);
case BINARY:
case VARBINARY:
case RAW:
return val ->
val instanceof RAW
? ((RAW) val).getBytes()
: val instanceof OracleBlob
? ((OracleBlob) val)
.getBytes(1, (int) ((OracleBlob) val).length())
: val.toString().getBytes();
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
return val -> val instanceof NUMBER ? ((NUMBER) val).intValue() : val;
case DATE:
return val ->
val instanceof DATE
? (int) (((DATE) val).dateValue().toLocalDate().toEpochDay())
: val instanceof Timestamp
? (int)
(((Timestamp) val)
.toLocalDateTime()
.toLocalDate()
.toEpochDay())
: (int) (((Date) val).toLocalDate().toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return val ->
val instanceof DATE
? (int)
(((DATE) val).timeValue().toLocalTime().toNanoOfDay()
/ 1_000_000L)
: (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val ->
val instanceof TIMESTAMP
? TimestampData.fromTimestamp(((TIMESTAMP) val).timestampValue())
: TimestampData.fromTimestamp((Timestamp) val);
case TIMESTAMP_WITH_TIME_ZONE:
return val -> {
if (val instanceof TIMESTAMPTZ) {
final TIMESTAMPTZ ts = (TIMESTAMPTZ) val;
final ZonedDateTime zdt =
ZonedDateTime.ofInstant(
ts.timestampValue().toInstant(),
ts.getTimeZone().toZoneId());
return TimestampData.fromLocalDateTime(zdt.toLocalDateTime());
} else {
return TimestampData.fromTimestamp((Timestamp) val);
}
};
case ARRAY:
case ROW:
case MAP:
case MULTISET:
default:
return super.createInternalConverter(type);
}
}
@Override
public String converterName() {
return "Dm";
}
}
createInternalConverter 是一个方法,用于创建将 JDBC ResultSet中的数据转换为Flink的内部数据结构的转换器。这个方法通常在JDBCInputFormat中被调用。在 Flink中,使用JDBCInputFormat从关系型数据库中读取数据时,它会将JDBC的ResultSet对象作为输入,然后通过 createInternalConverter方法将 ResultSet中的每一行数据转换为Flink的内部数据结构(例如Tuple或Row),以便后续的处理和计算。
使用idea工具的maven打包并在项目文件夹打开。
target/flink-connector-jdbc-3.1.1.jar
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。