78 Star 679 Fork 348

朱慧培 / flink-streaming-platform-web

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
demo_1.md 1.58 KB
一键复制 编辑 原始数据 按行查看 历史
cloud-coder 提交于 2021-01-11 15:31 . 调整SQL并在1.12.0中测试通过

demo1 单流kafka写入mysqld 参考

配置参考: jdbc kafka

触发方式: 针对每条触发一次

source kafka json 数据格式
topic: flink_test msg: {"day_time": "20201009","id": 7,"amnount":20}

sink mysql 创建语句

CREATE TABLE sync_test_1 (
  `day_time` varchar(64) NOT NULL,
  `total_gmv` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

配置语句


create table flink_test_1 ( 
  id BIGINT,
  day_time VARCHAR,
  amnount BIGINT,
  proctime AS PROCTIME ()
)
 with ( 
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '172.25.20.76:9092', 
  'properties.group.id' = 'flink_gp_test1',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  'properties.zookeeper.connect' = '172.25.20.76:2181/kafka'
 );

CREATE TABLE sync_test_1 (
                   day_time string,
                   total_gmv bigint,
                   PRIMARY KEY (day_time) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8',
   'table-name' = 'sync_test_1',
   'username' = 'videoweb',
   'password' = 'suntek'
 );

INSERT INTO sync_test_1 
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_1
GROUP BY day_time;
Java
1
https://gitee.com/zhuhuipei/flink-streaming-platform-web.git
git@gitee.com:zhuhuipei/flink-streaming-platform-web.git
zhuhuipei
flink-streaming-platform-web
flink-streaming-platform-web
master

搜索帮助