本文主要包括:
- Flink常见问题
Flink常见问题
Flink On YARN中任务挂掉后,YARN的Web UI显示还在运行,但实际上已经挂掉了
- 原因:这是由于Flink提交给YARN之后,YARN后续并没有继续监控Flink任务的状态
- 解决办法:在提交任务时,命令行加一个参数 -d 即可,例如:run -m yarn-cluster -d -p 2 -yn 2 -yjm 1024m -ytm 2048m -ynm xxxx -c xxxx
这个问题在浔兴变成100%复现了。但是在dolpinscheduler里无法使用-d参数,这里记录一下如何规避:
查看到jobmanager的日志里被以下日志刷屏了:
2024-07-27 17:10:25,702 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
2024-07-27 17:10:55,701 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
2024-07-27 17:11:25,681 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
猜测和这个有关系:应该是发送客户端心跳出现异常,导致任务虽然失败了,但是客户端接受不到已经异常了
通过debug,最终定位到是因为JobClientHeartbeatRequestBody类序列化出现了问题:
JobClientHeartbeatRequestBody类的构造函数里的参数,需要指定@JsonProperty
public class JobClientHeartbeatRequestBody implements RequestBody {
private static final String EXPIRED_TIMESTAMP = "expiredTimestamp";
@JsonProperty(EXPIRED_TIMESTAMP)
private final long expiredTimestamp;
@JsonCreator
public JobClientHeartbeatRequestBody(@JsonProperty(EXPIRED_TIMESTAMP) long expiredTimestamp) {
this.expiredTimestamp = expiredTimestamp;
}
@JsonIgnore
public long getExpiredTimestamp() {
return expiredTimestamp;
}
}
编译完flink之后,在对Oracle实时采集的时候,发现Checkpoint一直失败,报错日志如下:
2024-08-02 14:53:41,454 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Filter -> Flat Map (1/1) (ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on container_1714272405081_0303_01_000002 @ V-NJ-2-204 (dataPort=13071).
java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;
at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.serializeCheckpointBarrier(EventSerializer.java:260) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toSerializedEvent(EventSerializer.java:100) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBufferConsumer(EventSerializer.java:373) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.broadcastEvent(BufferWritingResultPartition.java:199) ~[flink-dist-1.17.2.jar:1.17.2]
...
2024-08-02 14:53:41,491 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 2 tasks will be restarted to recover the failed task ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2024-08-02 14:53:41,491 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job realtime-collect-oracle-kafka Snapshot And Stream (e3b12d95ef0e9746813b713106908662) switched from state RUNNING to RESTARTING.
2024-08-02 14:53:41,494 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job e3b12d95ef0e9746813b713106908662. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1977) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1608) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1174) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146) ~[flink-dist-1.17.2.jar:1.17.2]
...
最终定位到,是因为编译flink的时候使用了jdk11,但是,我们cdh里,使用的java是1.8导致checkpoint一直失败
flink打印中文日志出现乱码
flink on yarn运行,web页面task managers logs和stdout中文显示均乱码(全都问号),解决办法如下:
解决办法:在flink-conf.yaml
里添加env.java.opts: "-Dfile.encoding=UTF-8"
java.lang.Exception: Container released on a lost node
- 原因:YARN队列压力过大或者磁盘满了之后,可能会导致Flink申请的节点标记为失败,导致taskmanager挂掉
- 解决办法:如果有配置重启策略的话,taskmanager会进行重启,如果没有配置重启策略但是配置了checkpoint,默认的重启策略是无限次重启,但是需要注意一点的是,taskmananger成功重启的前提是jobmanager没有挂掉,如果jobmanager也挂掉了,那么taskmanager重启成功之后也是无效的。
生产上的任务频繁挂掉
一开始以为是问题二导致的,但是实际上问题二配置了checkpoint或者重启策略之后会自己重启,所以证明主要原因并不是以上的问题,其实主要的原因就是当YARN的队列资源紧张的时候,也有可能导致jobmanager挂掉,我们生产上的集群并没有配置Flink On YARN的高可用,即jobmanager挂掉之后是不会进行重启的,所以需要配置Flink On YARN的高可用,配置如下(此配置适用于Flink On YARN的yarn-cluster模式):
- 首先配置 yarn-site.xml,配置resourcemanager重启次数
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. </description> </property>
- 配置flink-conf.yaml ,这里必须添加zookeeper 信息,官方文档yarn-cluster模式只要求添加重启参数,不添加的话,task manager 会和job manager 一起挂掉, 只会重启对应的job manager
# flink job manager次数 这个参数必须小于yarn.resourcemanager.am.max-attempts yarn.application-attempts: 3 # 添加zookeeper 配置 high-availability: zookeeper high-availability.zookeeper.quorum: xx.x.x.xxx:2181 # job manager元数据在文件系统储存的位置 high-availability.storageDir: hdfs:///flink/recovery
此配置只能降低Flink的失败次数,如果想让Flink稳定运行,应该还是需要在YARN上单独划分一个队列给实时任务使用,避免因其他因素导致实时任务失败
提交任务到yarn上,一直报Retrying connect to server: 0.0.0.0/0.0.0.0:8032
公司服务迁移,把ds迁移到新节点,导致提交flink任务一直报以下日志:
[TID: N/A] [INFO] 2024-10-24 09:08:35.435 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:34,995 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:36.436 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:35,995 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:37.437 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:36,996 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:38.438 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:37,996 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:39.439 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:38,997 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:40.440 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:39,997 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
首先,新节点不在hadoop的实例内。属于单独的client像hadoop集群提交任务
这种情况需要在该节点安装一个hadoop的客户端,并且配置文件和源集群配置一样
通过检查,发现新节点确实安装了hadoop的客户端,并且,hadoop classpath也已经配置了。但是,yarn-site.xml和源集群的不一样。
解决方法:把配置文件全部替换成源集群的配置文件
Flink On Yarn提交任务后,状态一直是CREATE
Flink On Yarn提交任务后,状态一直是CREATE,执行几分钟后,报错:
Could not allocate the required slot within slot request timeout.
错误原因
首先检查是否集群资源不够的问题,然后再检查是否是打包的问题;
我这边出现错误的原因是因为我的POM配置文件配置有误;
POM文件有问题,漏写了几个
解决办法
将漏写的
Flink on yarn在main函数里定义的info日志,打印不出来
在main函数里定义的info日志,在idea里可以正常打印,但是,flink on yarn的方式,就不打印了。
通过测试,flink on standalone模式,日志会打印在client.log里。但是flink onyarn模式,因为提交任务到yarn上之后,client也不会在本机执行,所以日志不会在提交命令行展示,但是具体展示也没找到地方
下面是通过配置log4j2的方式集成日志的尝试:
- pom文件
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.4.1</version> </dependency> <!-- 需要把slf4j-log4j12的依赖去除,有冲突 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>
- log4j2配置文件
可以参考记一次log4j不打印日志的踩坑记<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n" /> </Console> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Console" /> </Root> </Loggers> </Configuration>
经过实际测试,flink lib下的log4j的jar不需要动。这种方式,通过standalone方式,可以在client.log里看到日志。但是提交到yarn上之后,所有日志都没有了,连tm和jm的日志都没有
怀疑是log4j2的问题。因为原本的项目使用的是log4j.properties