[TOC]
哔哩哔哩视频
git clone git@gitee.com:dushaofeng/FlinkProject.git
scala
SDK,官网下载直接安装即可,会自动添加环境变量scala
插件,否则无法创建scala
文件scala
的SDK依赖,在项目的Global Libraries
中添加步骤1中设置的scala-sdk
目录依赖socket
环境该Demo中涉及到用socket发送字串流,可以在ubuntu环境下,执行如下命令即可:
nc -lk 7777
setParallelism()
slot
个数,否则会启动失败(并行度大于插槽数据,无法放入插槽)flink-conf.yaml
中配置 parallelism.default: 1
) > Flink网页参数配置集群的slot数量,要保证slot大于并行度
修改flink-conf.yaml
中的taskmanager.numberOfTaskSlots
为4
运行集群环境,浏览器打开地址
d:\Flink\flink-1.10.0\bin\start-cluster.bat
mvnen
点击package
即可IDEA
编译出来的job
文件(jar
包)上传到集群
submit new job
的页面上传Entry class
要输入带包名的class,比如com.cmp.wc.WordCount
通过命令行部署到Flink集群
./bin/flink run -c com.xxx.xxx -p 2 JAR_FILE_PATH
关闭集群
./bin/stop-cluster.sh
查看运行的jobs
./bin/flink list
取消job
./bin/flink cancle JOB_ID
One-to-one,比如 map filter flatmap等
Redistributing,比如 keyBy基于hashCode,或者broadcast/rebalance等
合并任务的前提: one-to-one + 相同并行度 禁止任务合并:
env.disableOperatorChaining()
单独开启任务合并
startNewChain()
getExecutionEnvironment
Kafuka连接器中自动会保存偏移量,与Kafuka协商解决回滚问题
把数据打散
过滤器
数据会按照hash code做重分区 会把流拆分,但是只分区,不分流
滚动聚合 必须是 keydStream以后的(keyby以后)的数据进行聚合
java和scala所支持的基本类型都可以支持 基础数据类型 Java简单对象 ArrayList HashMap Enum ...
Flink没有spark中的foreach方法 Flink所有对外的输出操作都要利用Sink完成 print就是一种特殊的Sink方法 官方提供的一些Sink渠道: Kafka ElasticSearch Hadoop FileSystem
Bahir提供的一些sink Flume Redis
没有官方提供Mysql的Sink,需要自己写
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。