0%

Flink常见问题

本文主要包括:

  • Flink常见问题

Flink常见问题

  • 原因:这是由于Flink提交给YARN之后,YARN后续并没有继续监控Flink任务的状态
  • 解决办法:在提交任务时,命令行加一个参数 -d 即可,例如:run -m yarn-cluster -d -p 2 -yn 2 -yjm 1024m -ytm 2048m -ynm xxxx -c xxxx

这个问题在浔兴变成100%复现了。但是在dolpinscheduler里无法使用-d参数,这里记录一下如何规避:
查看到jobmanager的日志里被以下日志刷屏了:

2024-07-27 17:10:25,702 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
2024-07-27 17:10:55,701 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
2024-07-27 17:11:25,681 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.

猜测和这个有关系:应该是发送客户端心跳出现异常,导致任务虽然失败了,但是客户端接受不到已经异常了
通过debug,最终定位到是因为JobClientHeartbeatRequestBody类序列化出现了问题:
JobClientHeartbeatRequestBody类的构造函数里的参数,需要指定@JsonProperty

public class JobClientHeartbeatRequestBody implements RequestBody {
    private static final String EXPIRED_TIMESTAMP = "expiredTimestamp";

    @JsonProperty(EXPIRED_TIMESTAMP)
    private final long expiredTimestamp;

    @JsonCreator
    public JobClientHeartbeatRequestBody(@JsonProperty(EXPIRED_TIMESTAMP) long expiredTimestamp) {
        this.expiredTimestamp = expiredTimestamp;
    }

    @JsonIgnore
    public long getExpiredTimestamp() {
        return expiredTimestamp;
    }
}

编译完flink之后,在对Oracle实时采集的时候,发现Checkpoint一直失败,报错日志如下:

2024-08-02 14:53:41,454 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Flat Map (1/1) (ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on container_1714272405081_0303_01_000002 @ V-NJ-2-204 (dataPort=13071).
java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;
	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.serializeCheckpointBarrier(EventSerializer.java:260) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toSerializedEvent(EventSerializer.java:100) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBufferConsumer(EventSerializer.java:373) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.broadcastEvent(BufferWritingResultPartition.java:199) ~[flink-dist-1.17.2.jar:1.17.2]
...
2024-08-02 14:53:41,491 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 2 tasks will be restarted to recover the failed task ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2024-08-02 14:53:41,491 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job realtime-collect-oracle-kafka Snapshot And Stream (e3b12d95ef0e9746813b713106908662) switched from state RUNNING to RESTARTING.
2024-08-02 14:53:41,494 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job e3b12d95ef0e9746813b713106908662. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1977) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1608) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1174) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146) ~[flink-dist-1.17.2.jar:1.17.2]
...

最终定位到,是因为编译flink的时候使用了jdk11,但是,我们cdh里,使用的java是1.8导致checkpoint一直失败

flink打印中文日志出现乱码

flink on yarn运行,web页面task managers logs和stdout中文显示均乱码(全都问号),解决办法如下:
解决办法:在flink-conf.yaml里添加env.java.opts: "-Dfile.encoding=UTF-8"

java.lang.Exception: Container released on a lost node

  • 原因:YARN队列压力过大或者磁盘满了之后,可能会导致Flink申请的节点标记为失败,导致taskmanager挂掉
  • 解决办法:如果有配置重启策略的话,taskmanager会进行重启,如果没有配置重启策略但是配置了checkpoint,默认的重启策略是无限次重启,但是需要注意一点的是,taskmananger成功重启的前提是jobmanager没有挂掉,如果jobmanager也挂掉了,那么taskmanager重启成功之后也是无效的。

生产上的任务频繁挂掉

一开始以为是问题二导致的,但是实际上问题二配置了checkpoint或者重启策略之后会自己重启,所以证明主要原因并不是以上的问题,其实主要的原因就是当YARN的队列资源紧张的时候,也有可能导致jobmanager挂掉,我们生产上的集群并没有配置Flink On YARN的高可用,即jobmanager挂掉之后是不会进行重启的,所以需要配置Flink On YARN的高可用,配置如下(此配置适用于Flink On YARN的yarn-cluster模式):

  1. 首先配置 yarn-site.xml,配置resourcemanager重启次数
    <property>
      <name>yarn.resourcemanager.am.max-attempts</name>
      <value>4</value>
      <description>
        The maximum number of application master execution attempts.
      </description>
    </property>
  2. 配置flink-conf.yaml ,这里必须添加zookeeper 信息,官方文档yarn-cluster模式只要求添加重启参数,不添加的话,task manager 会和job manager 一起挂掉, 只会重启对应的job manager
    # flink job manager次数  这个参数必须小于yarn.resourcemanager.am.max-attempts 
    yarn.application-attempts: 3
    # 添加zookeeper 配置
    high-availability: zookeeper
    high-availability.zookeeper.quorum: xx.x.x.xxx:2181
    # job manager元数据在文件系统储存的位置
    high-availability.storageDir: hdfs:///flink/recovery  

    此配置只能降低Flink的失败次数,如果想让Flink稳定运行,应该还是需要在YARN上单独划分一个队列给实时任务使用,避免因其他因素导致实时任务失败

提交任务到yarn上,一直报Retrying connect to server: 0.0.0.0/0.0.0.0:8032

公司服务迁移,把ds迁移到新节点,导致提交flink任务一直报以下日志:

[TID: N/A] [INFO] 2024-10-24 09:08:35.435  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:34,995 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:36.436  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:35,995 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:37.437  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:36,996 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:38.438  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:37,996 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:39.439  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:38,997 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:40.440  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:39,997 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)

首先,新节点不在hadoop的实例内。属于单独的client像hadoop集群提交任务
这种情况需要在该节点安装一个hadoop的客户端,并且配置文件和源集群配置一样
通过检查,发现新节点确实安装了hadoop的客户端,并且,hadoop classpath也已经配置了。但是,yarn-site.xml和源集群的不一样。
解决方法:把配置文件全部替换成源集群的配置文件

Flink On Yarn提交任务后,状态一直是CREATE,执行几分钟后,报错:

Could not allocate the required slot within slot request timeout.

错误原因
首先检查是否集群资源不够的问题,然后再检查是否是打包的问题;

我这边出现错误的原因是因为我的POM配置文件配置有误;

POM文件有问题,漏写了几个provided,导致有些Flink相关的包也给打包进去了;

解决办法
将漏写的 provided 补上,重新打包运行;

在main函数里定义的info日志,在idea里可以正常打印,但是,flink on yarn的方式,就不打印了。
通过测试,flink on standalone模式,日志会打印在client.log里。但是flink onyarn模式,因为提交任务到yarn上之后,client也不会在本机执行,所以日志不会在提交命令行展示,但是具体展示也没找到地方
下面是通过配置log4j2的方式集成日志的尝试:

  1. pom文件
    <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>2.4.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>2.4.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.4.1</version>
            </dependency>
    
            <!-- 需要把slf4j-log4j12的依赖去除,有冲突 -->
            <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
            </dependency>
  2. log4j2配置文件
    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="INFO">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
                <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
                <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n" />
            </Console>
    
        </Appenders>
        <Loggers>
            <Root level="info">
                <AppenderRef ref="Console" />
            </Root>
        </Loggers>
    </Configuration>
    可以参考记一次log4j不打印日志的踩坑记

经过实际测试,flink lib下的log4j的jar不需要动。这种方式,通过standalone方式,可以在client.log里看到日志。但是提交到yarn上之后,所有日志都没有了,连tm和jm的日志都没有
怀疑是log4j2的问题。因为原本的项目使用的是log4j.properties