Flink作为有状态的流式计算引擎,周期性的checkpoint至关重要。checkpoint的周期不宜设置过长或过短,针对不同的任务要区别对待。甚至针对同一个任务,在不同场景下checkpoint过程也会因为超时或反压等原因导致失败。下面先来看一下传统checkpoint调优所面临的问题:
Flink checkpoint速率、频率、超时时间参数等直接影响了任务的健康度。当flink任务重启时,会因消息积压导致任务反压,任务反压反过来会促使checkpoint变慢甚至是超时。如此一来,仿佛进入了一个恶性循环。
Fire框架为Flink checkpoint提供了增强,可以做到运行时动态调整checkpoint的相关参数,达到不重启任务即可实现动态调优的目的。Flink开发者只需集成[集成Fire框架](ZTO-Express/fire (github.com)) ,就可以在运行时通过调用Fire框架提供的restful接口,从而实现动态调整checkpoint参数的目的了。
假设线上有这样一个任务,每秒钟处理的消息量非常大,状态非常大,每次checkpoint耗时在5分钟以上。这个任务如果停止10分钟以上,会导致大量的消息积压,而消息积压导致的反压叠加checkpoint,会进一步影响任务的性能。这个时候,可以临时先将checkpoint周期调大,等反压结束后再调整回之前的checkpoint周期,降低了checkpoint耗时较长带来性能下降的影响。
不愿停止任务,只是临时性的调整checkpoint周期、超时参数等。
@Checkpoint(interval = 100, unaligned = true) // 100s做一次checkpoint,开启非对齐checkpoint
@Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire")
object Demo extends BaseFlinkStreaming {
override def process: Unit = {
val dstream = this.fire.createKafkaDirectStream() // 使用api的方式消费kafka
this.fire.sql("""create table statement ...""")
this.fire.sql("""insert into statement ...""")
this.fire.start
}
}
集成了Fire框架的flink任务在运行起来以后,可以在flink的webui的Job Manager -> Configuration中查看到restful接口地址:
找到接口地址以后,通过curl命令调用该接口即可实现动态调优:
curl -H "Content-Type:application/json" -X POST --data '{"interval":60000,"minPauseBetween": 60000, "timeout": 60000}' http://ip:5753/system/checkpoint
效果如下图所示:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。