本文主要包括:
- flink提交任务自动停止
问题
flink提交任务自动停止
在flink从1.13.6升级到1.17.2之后发现,在dolphinscheduler提交任务到yarn上,每次只能运行4分钟左右就会自动cancel
经过多次对比,发现dolphinscheduler提交flink任务的时候,会添加-sae
参数,在后台手动添加-sae
参数提交flink任务,也是存在同样的问题
原因
flank在新版本中,添加了实验性功能选项。client.heartbeat.interval
和client.heartbeat.timeout
如果调度程序在超时后未收到客户端的检测信号,则当execution.attached
和execution.shutdown-on-attached-exit
均为 true 时,请取消作业。
通过查看flink源码,execution.shutdown-on-attached-exit对应的就是-sae参数
public static final Option SHUTDOWN_IF_ATTACHED_OPTION =
new Option(
"sae",
"shutdownOnAttachedExit",
false,
"If the job is submitted in attached mode, perform a best-effort cluster shutdown "
+ "when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
也就是说,每隔30s客户端都会往jobmanager发送心跳,如果客户端长时间接受不到心跳,就会自动cancel任务
客户端发送心跳的代码:
public static ScheduledExecutorService reportHeartbeatPeriodically(
JobClient jobClient, long interval, long timeout) {
checkArgument(
interval < timeout,
"The client's heartbeat interval "
+ "should be less than the heartbeat timeout. Please adjust the param '"
+ ClientOptions.CLIENT_HEARTBEAT_INTERVAL
+ "' or '"
+ ClientOptions.CLIENT_HEARTBEAT_TIMEOUT
+ "'");
JobID jobID = jobClient.getJobID();
LOG.info("Begin to report client's heartbeat for the job {}.", jobID);
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
scheduledExecutor.scheduleAtFixedRate(
() -> {
LOG.info("Report client's heartbeat for the job {}.", jobID);
jobClient.reportHeartbeat(System.currentTimeMillis() + timeout);
},
interval,
interval,
TimeUnit.MILLISECONDS);
return scheduledExecutor;
}
这个在standalone模型下,可以在log/flink-root-client-V-SH-101-227.log
下看到心跳日志
也就是说,客户端侧没什么问题。现在是jobmanager侧没有接收到心跳或者,接收到心跳还把任务杀掉了
jobmanager杀掉任务源码:
private void checkJobClientAliveness() {
setClientHeartbeatTimeoutForInitializedJob();
long currentTimestamp = System.currentTimeMillis();
Iterator<Map.Entry<JobID, Long>> iterator = jobClientExpiredTimestamp.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<JobID, Long> entry = iterator.next();
JobID jobID = entry.getKey();
long expiredTimestamp = entry.getValue();
if (!getJobManagerRunner(jobID).isPresent()) {
iterator.remove();
} else if (expiredTimestamp <= currentTimestamp) {
log.warn(
"The heartbeat from the job client is timeout and cancel the job {}. "
+ "You can adjust the heartbeat interval "
+ "by 'client.heartbeat.interval' and the timeout "
+ "by 'client.heartbeat.timeout'",
jobID);
cancelJob(jobID, webTimeout);
}
}
}
private void setClientHeartbeatTimeoutForInitializedJob() {
Iterator<Map.Entry<JobID, Long>> iterator =
uninitializedJobClientHeartbeatTimeout.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<JobID, Long> entry = iterator.next();
JobID jobID = entry.getKey();
Optional<JobManagerRunner> jobManagerRunnerOptional = getJobManagerRunner(jobID);
if (!jobManagerRunnerOptional.isPresent()) {
iterator.remove();
} else if (jobManagerRunnerOptional.get().isInitialized()) {
jobClientExpiredTimestamp.put(jobID, System.currentTimeMillis() + entry.getValue());
iterator.remove();
}
}
}
通过debug日志,发现每次expiredTimestamp
都是一样的,也就是jobClientExpiredTimestamp.put(jobID, System.currentTimeMillis() + entry.getValue());
就执行了一次,后面就没再执行
查看initialClientHeartbeatTimeout是-9223372036854775808
private void initJobClientExpiredTime(JobGraph jobGraph) {
JobID jobID = jobGraph.getJobID();
//todo 这里需要看下initialClientHeartbeatTimeout是否大于0怀疑这里没执行啊
long initialClientHeartbeatTimeout = jobGraph.getInitialClientHeartbeatTimeout();
if (initialClientHeartbeatTimeout > 0) {
log.info(
"Begin to detect the client's aliveness for job {}. The heartbeat timeout is {}",
jobID,
initialClientHeartbeatTimeout);
uninitializedJobClientHeartbeatTimeout.put(jobID, initialClientHeartbeatTimeout);
if (jobClientAlivenessCheck == null) {
// Use the client heartbeat timeout as the check interval.
jobClientAlivenessCheck =
this.getRpcService()
.getScheduledExecutor()
.scheduleWithFixedDelay(
() ->
getMainThreadExecutor()
.execute(this::checkJobClientAliveness),
0L,
jobClientAlivenessCheckInterval,
TimeUnit.MILLISECONDS);
}
}
}
继续深入源码:JobGraph.getInitialClientHeartbeatTimeout,PipelineExecutorUtils.getJobGraph
if (configuration.getBoolean(DeploymentOptions.ATTACHED)
&& configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
jobGraph.setInitialClientHeartbeatTimeout(
configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
}
可以看到,这里只有设置了-sae和ATTACHED才会设置CLIENT_HEARTBEAT_TIMEOUT
解决方案
修改org.apache.flink.runtime.dispatcher.Dispatcher
的setClientHeartbeatTimeoutForInitializedJob
方法,去掉iterator.remove();
private void setClientHeartbeatTimeoutForInitializedJob() {
Iterator<Map.Entry<JobID, Long>> iterator =
uninitializedJobClientHeartbeatTimeout.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<JobID, Long> entry = iterator.next();
JobID jobID = entry.getKey();
Optional<JobManagerRunner> jobManagerRunnerOptional = getJobManagerRunner(jobID);
if (!jobManagerRunnerOptional.isPresent()) {
iterator.remove();
} else if (jobManagerRunnerOptional.get().isInitialized()) {
jobClientExpiredTimestamp.put(jobID, System.currentTimeMillis() + entry.getValue());
//iterator.remove();
}
}
}
改方案不会使得while变成死循环。
经过验证,方案可行。
编译Flink
在国内编译真的是各种问题,直接在微软云做编译,顺畅的不行。10分钟搞定,无任何报错
- 下载代码:
git clone git@github.com:gujincheng/flink.git -b digiwin-1.17.2
- 编译代码:
mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=3.0.0-cdh6.2.0 -Dinclude-hadoop -Dscala-2.12 -T20C
checkpoint一直失败
编译完flink之后,在对Oracle实时采集的时候,发现Checkpoint一直失败,报错日志如下:
最终定位到,是因为编译flink的时候使用了jdk11,但是,我们cdh里,使用的java是1.8导致checkpoint一直失败2024-08-02 14:53:41,454 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Filter -> Flat Map (1/1) (ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on container_1714272405081_0303_01_000002 @ V-NJ-2-204 (dataPort=13071). java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer; at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.serializeCheckpointBarrier(EventSerializer.java:260) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toSerializedEvent(EventSerializer.java:100) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBufferConsumer(EventSerializer.java:373) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.broadcastEvent(BufferWritingResultPartition.java:199) ~[flink-dist-1.17.2.jar:1.17.2] ... 2024-08-02 14:53:41,491 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 2 tasks will be restarted to recover the failed task ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0. 2024-08-02 14:53:41,491 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job realtime-collect-oracle-kafka Snapshot And Stream (e3b12d95ef0e9746813b713106908662) switched from state RUNNING to RESTARTING. 2024-08-02 14:53:41,494 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job e3b12d95ef0e9746813b713106908662. (0 consecutive failed attempts so far) org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1977) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1608) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1174) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146) ~[flink-dist-1.17.2.jar:1.17.2] ...
为什么不使用 > 1.18 的flink
最根本的,是因为flink 1.18不再支持jdk1.8,但是目前我们CDH使用的都是1.8,这会导致flink与我们环境无法兼容