0%

Flink调优

本文主要包括:

  • Checkpoint很慢,一直超时
  • Flink-1.13提交任务到yarn上没有tm和jm的日志

Checkpoint很慢,一直超时

flink程序的Checkpoint一直超时失败
checkpoint失败
taskmanager的日志打印以下日志:

org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: 135751ms

task 仅在接受到所有的 barrier 之后才会进行 snapshot,如果作业存在反压,或者有数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间
具体可以参考Flink Checkpoint 问题排查实用指南

通过查看Flink WebUI查看,确实存在反压,并且查看taskmanagers的各个并行度的消息消费情况,发现只有2哥并行度有数据消费,其他并行度没有数据。所以数据出现严重的数据倾斜
解决数据倾斜的方法是:

SingleOutputStreamOperator<Alert> alertDS = dataDS
                .connect(broadcastStream)
                .process(new DynamicKeyFunction())
                .uid("DynamicKeyFunction")
                .name("Dynamic Partitioning Function")
                .keyBy(keyData -> DigestUtils.md5Hex(keyData.getKey() + (int)(Math.random() * 10 )))
                .connect(broadcastStream)
                .process(new DynamicAlertFunction())
                .uid("DynamicAlertFunction")
                .name("Dynamic Rule Evaluation Function");

把keyby的key打散即可

这个是因为,flink-1.13提交任务到yarn上,并不会读取${FLINK_HOME}/lib下的jar,所以,log4j的依赖没有,这里有2个解决办法:

  1. 把log4j的依赖打在maven项目里
           <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.17.1</version>
    <scope>compile</scope>
    </dependency>
    
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.17.1</version>
    <scope>compile</scope>
    </dependency>
    
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.17.1</version>
    <scope>compile</scope>
    </dependency>
    
    <dependency>
    <!-- API bridge between log4j 1 and 2; included for convenience -->
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-1.2-api</artifactId>
    <version>2.17.1</version>
    <scope>compile</scope>
    </dependency>
  2. 把log4j相关的依赖从maven项目里去掉,这时候flink-1.13会默认使用logback来处理日志