同步操作将从 朱慧培/flink-streaming-platform-web 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
注意:针对双流中的每条记录都发触发
source kafka json 数据格式
topic flink_test_1 {"day_time": "20201011","id": 7,"amount":211} topic flink_test_2 {"id": 7,"coupon_amount":100}
sink mysql 创建语句
CREATE TABLE `sync_test_2` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`day_time` varchar(64) DEFAULT NULL,
`total_gmv` bigint(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx` (`day_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
double_stream_write_mysql_demo
YARN_PER
-yjm 1024 -ytm 1024 -p 3 -yqu default -yD yarn.provided.lib.dirs="hdfs://pci01:8020/jars/flink/"
-checkpointDir hdfs://172.25.21.170:8020/flink/checkpoint/double_stream_write_mysql_demo
http://172.25.21.170:50070/webhdfs/v1/jars/common/flink-connector-jdbc_2.11-1.13.2.jar?op=OPEN http://172.25.21.170:50070/webhdfs/v1/jars/common/flink-sql-connector-kafka_2.11-1.13.2.jar?op=OPEN http://172.25.21.170:50070/webhdfs/v1/jars/common/mysql-connector-java-5.1.49.jar?op=OPEN http://172.25.21.170:50070/webhdfs/v1/jars/common/flink-sql-connector-elasticsearch7_2.12-1.13.2.jar?op=OPEN
create table flink_test_2_1 (
id BIGINT,
day_time VARCHAR,
amount BIGINT,
proctime AS PROCTIME ()
)
with (
'connector' = 'kafka',
'topic' = 'flink_test_1',
'properties.bootstrap.servers' = '172.25.20.76:9092',
'properties.group.id' = 'flink_gp_test2-1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.zookeeper.connect' = '172.25.20.76:2181/kafka'
);
create table flink_test_2_2 (
id BIGINT,
coupon_amount BIGINT,
proctime AS PROCTIME ()
)
with (
'connector' = 'kafka',
'topic' = 'flink_test_2',
'properties.bootstrap.servers' = '172.25.20.76:9092',
'properties.group.id' = 'flink_gp_test2-2',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.zookeeper.connect' = '172.25.20.76:2181/kafka'
);
CREATE TABLE sync_test_2 (
day_time string,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8',
'table-name' = 'sync_test_2',
'username' = 'videoweb',
'password' = 'suntek'
);
INSERT INTO sync_test_2
SELECT
day_time,
SUM(amount - coupon_amount) AS total_gmv
FROM
(
SELECT
a.day_time as day_time,
a.amount as amount,
b.coupon_amount as coupon_amount
FROM
flink_test_2_1 as a
LEFT JOIN flink_test_2_2 b on b.id = a.id
)
GROUP BY day_time;
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。