1 Star 0 Fork 0

杜少峰 / FlinkProject

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

[TOC]

参考教程

哔哩哔哩视频

项目地址

git clone git@gitee.com:dushaofeng/FlinkProject.git

运行环境搭建

  1. 安装scalaSDK,官网下载直接安装即可,会自动添加环境变量
  2. 安装IDEA的scala插件,否则无法创建scala文件
  3. IDEA为工程添加scala的SDK依赖,在项目的Global Libraries中添加步骤1中设置的scala-sdk目录依赖

如何创建socket环境

该Demo中涉及到用socket发送字串流,可以在ubuntu环境下,执行如下命令即可:

nc -lk 7777

并行度设置

  • 每个算子都可以单独设置并行度

    setParallelism()

  • 并行度不能大于slot个数,否则会启动失败(并行度大于插槽数据,无法放入插槽)
  • 可以设置并行度的地方以及优先级: 代码 > 配置文件(flink-conf.yaml 中配置 parallelism.default: 1) > Flink网页参数

部署到Flink集群中

  • 配置集群的slot数量,要保证slot大于并行度 修改flink-conf.yaml中的taskmanager.numberOfTaskSlots为4

  • 运行集群环境,浏览器打开地址

d:\Flink\flink-1.10.0\bin\start-cluster.bat

  • 编译Job工程,IDEA上面的mvnen点击package即可
  • IDEA编译出来的job文件(jar包)上传到集群
    1. 网页上在submit new job的页面上传
    2. 选择上传的文件后,输入执行参数,Entry class要输入带包名的class,比如com.cmp.wc.WordCount

常用命令

  • 通过命令行部署到Flink集群

    ./bin/flink run -c com.xxx.xxx -p 2 JAR_FILE_PATH

  • 关闭集群

    ./bin/stop-cluster.sh

  • 查看运行的jobs

    ./bin/flink list

  • 取消job

    ./bin/flink cancle JOB_ID

运行架构

slot与Taskmanager

  • 每一个TaskManager都是一个JVM进程,会执行一个或多个subtask
  • slot是资源管理最小单位,一般按照CPU数量划分slot
  • 默认情况下,Flink允许子任务共享slot,即使他们是不同任务的子任务
  • 所有Flink程序由三部分组成:Source/Transformathon/Sink

并行度

One-to-one,比如 map filter flatmap等
Redistributing,比如 keyBy基于hashCode,或者broadcast/rebalance等

任务链

合并任务的前提: one-to-one + 相同并行度 禁止任务合并:

env.disableOperatorChaining()

单独开启任务合并

startNewChain()

流处理API

Environment(环境)

getExecutionEnvironment

Source(数据源)

Kafuka连接器中自动会保存偏移量,与Kafuka协商解决回滚问题

Transform(转换算子)

map

flatMap

把数据打散

filter

过滤器

Keyby

数据会按照hash code做重分区 会把流拆分,但是只分区,不分流

滚动聚合算子

  • sum
  • min
  • max
  • minBy
  • maxBy

reduce

split和select

connect和CoMap

  • connect会把两个流合并,但实际还是两个流
  • connect两个流可以是不同的数据结构
  • CoMap会把两个流合并
  • 每次connect只能合并两条

union

  • 可以合并多条流
  • 每条流的数据结构一样

滚动聚合 必须是 keydStream以后的(keyby以后)的数据进行聚合

支持的数据类型

java和scala所支持的基本类型都可以支持 基础数据类型 Java简单对象 ArrayList HashMap Enum ...

实现UDF函数(User Defined Function,即用户自定义函数)--更细粒度的控制流

  • 函数类
  • 匿名类
  • 富函数(Rich Functions) 不是函数,而是函数类,具备更多更强大的功能,比如获取环境上下文,拥有生命周期,比如: RichMapFunction 就是 MapFunction 的富函数

Sink

Flink没有spark中的foreach方法 Flink所有对外的输出操作都要利用Sink完成 print就是一种特殊的Sink方法 官方提供的一些Sink渠道: Kafka ElasticSearch Hadoop FileSystem

Bahir提供的一些sink Flume Redis

没有官方提供Mysql的Sink,需要自己写

空文件

简介

取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/dushaofeng/FlinkProject.git
git@gitee.com:dushaofeng/FlinkProject.git
dushaofeng
FlinkProject
FlinkProject
master

搜索帮助