1 Star 0 Fork 353

cloud-coder / flink-streaming-platform-web

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
demo_2.md 2.85 KB
一键复制 编辑 原始数据 按行查看 历史
cloud-coder 提交于 2021-09-16 18:05 . 升级到13.2并验证sql_demo

demo2 双流kafka写入mysql 参考

配置示例

触发方式

注意:针对双流中的每条记录都发触发

source kafka json 数据格式

topic flink_test_1 {"day_time": "20201011","id": 7,"amount":211} topic flink_test_2 {"id": 7,"coupon_amount":100}

前提

sink mysql 创建语句

CREATE TABLE `sync_test_2` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `day_time` varchar(64) DEFAULT NULL,
  `total_gmv` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx` (`day_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

具体配置

double_stream_write_mysql_demo

YARN_PER

-yjm 1024 -ytm 1024 -p 3 -yqu default -yD yarn.provided.lib.dirs="hdfs://pci01:8020/jars/flink/"

-checkpointDir hdfs://172.25.21.170:8020/flink/checkpoint/double_stream_write_mysql_demo

http://172.25.21.170:50070/webhdfs/v1/jars/common/flink-connector-jdbc_2.11-1.13.2.jar?op=OPEN http://172.25.21.170:50070/webhdfs/v1/jars/common/flink-sql-connector-kafka_2.11-1.13.2.jar?op=OPEN http://172.25.21.170:50070/webhdfs/v1/jars/common/mysql-connector-java-5.1.49.jar?op=OPEN http://172.25.21.170:50070/webhdfs/v1/jars/common/flink-sql-connector-elasticsearch7_2.12-1.13.2.jar?op=OPEN

create table flink_test_2_1 ( 
  id BIGINT,
  day_time VARCHAR,
  amount BIGINT,
  proctime AS PROCTIME ()
)
 with ( 
   'connector' = 'kafka',
   'topic' = 'flink_test_1',
   'properties.bootstrap.servers' = '172.25.20.76:9092', 
   'properties.group.id' = 'flink_gp_test2-1',
   'scan.startup.mode' = 'earliest-offset',
   'format' = 'json',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true',
   'properties.zookeeper.connect' = '172.25.20.76:2181/kafka'
 );


create table flink_test_2_2 ( 
  id BIGINT,
  coupon_amount BIGINT,
  proctime AS PROCTIME ()
)
 with ( 
   'connector' = 'kafka',
   'topic' = 'flink_test_2',
   'properties.bootstrap.servers' = '172.25.20.76:9092', 
   'properties.group.id' = 'flink_gp_test2-2',
   'scan.startup.mode' = 'earliest-offset',
   'format' = 'json',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true',
   'properties.zookeeper.connect' = '172.25.20.76:2181/kafka'
 );


CREATE TABLE sync_test_2 (
                   day_time string,
                   total_gmv bigint,
                   PRIMARY KEY (day_time) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8',
   'table-name' = 'sync_test_2',
   'username' = 'videoweb',
   'password' = 'suntek'
 );

INSERT INTO sync_test_2 
SELECT 
  day_time, 
  SUM(amount - coupon_amount) AS total_gmv 
FROM 
  (
    SELECT
      a.day_time as day_time, 
      a.amount as amount, 
      b.coupon_amount as coupon_amount 
    FROM 
      flink_test_2_1 as a 
      LEFT JOIN flink_test_2_2 b on b.id = a.id
  ) 
GROUP BY  day_time;
1
https://gitee.com/cloudcoder/flink-streaming-platform-web.git
git@gitee.com:cloudcoder/flink-streaming-platform-web.git
cloudcoder
flink-streaming-platform-web
flink-streaming-platform-web
master

搜索帮助