本文主要包括:
Checkpoint很慢,一直超时
flink程序的Checkpoint一直超时失败
taskmanager的日志打印以下日志:
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: 135751ms
task 仅在接受到所有的 barrier 之后才会进行 snapshot,如果作业存在反压,或者有数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间
具体可以参考Flink Checkpoint 问题排查实用指南
通过查看Flink WebUI
查看,确实存在反压,并且查看taskmanagers
的各个并行度的消息消费情况,发现只有2哥并行度有数据消费,其他并行度没有数据。所以数据出现严重的数据倾斜
解决数据倾斜的方法是:
SingleOutputStreamOperator<Alert> alertDS = dataDS
.connect(broadcastStream)
.process(new DynamicKeyFunction())
.uid("DynamicKeyFunction")
.name("Dynamic Partitioning Function")
.keyBy(keyData -> DigestUtils.md5Hex(keyData.getKey() + (int)(Math.random() * 10 )))
.connect(broadcastStream)
.process(new DynamicAlertFunction())
.uid("DynamicAlertFunction")
.name("Dynamic Rule Evaluation Function");
把keyby的key打散即可
Flink-1.13提交任务到yarn上没有tm和jm的日志
这个是因为,flink-1.13提交任务到yarn上,并不会读取${FLINK_HOME}/lib下的jar,所以,log4j的依赖没有,这里有2个解决办法:
- 把log4j的依赖打在maven项目里
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.17.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.17.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.17.1</version> <scope>compile</scope> </dependency> <dependency> <!-- API bridge between log4j 1 and 2; included for convenience --> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-1.2-api</artifactId> <version>2.17.1</version> <scope>compile</scope> </dependency>
- 把log4j相关的依赖从maven项目里去掉,这时候flink-1.13会默认使用logback来处理日志