0%

flink二次开发

本文主要包括:

  • flink提交任务自动停止

问题

flink提交任务自动停止

在flink从1.13.6升级到1.17.2之后发现,在dolphinscheduler提交任务到yarn上,每次只能运行4分钟左右就会自动cancel
经过多次对比,发现dolphinscheduler提交flink任务的时候,会添加-sae参数,在后台手动添加-sae参数提交flink任务,也是存在同样的问题

原因

flank在新版本中,添加了实验性功能选项。
client.heartbeat.intervalclient.heartbeat.timeout
如果调度程序在超时后未收到客户端的检测信号,则当execution.attachedexecution.shutdown-on-attached-exit均为 true 时,请取消作业。
通过查看flink源码,execution.shutdown-on-attached-exit对应的就是-sae参数

public static final Option SHUTDOWN_IF_ATTACHED_OPTION =
            new Option(
                    "sae",
                    "shutdownOnAttachedExit",
                    false,
                    "If the job is submitted in attached mode, perform a best-effort cluster shutdown "
                            + "when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");

也就是说,每隔30s客户端都会往jobmanager发送心跳,如果客户端长时间接受不到心跳,就会自动cancel任务

客户端发送心跳的代码:

public static ScheduledExecutorService reportHeartbeatPeriodically(
            JobClient jobClient, long interval, long timeout) {
        checkArgument(
                interval < timeout,
                "The client's heartbeat interval "
                        + "should be less than the heartbeat timeout. Please adjust the param '"
                        + ClientOptions.CLIENT_HEARTBEAT_INTERVAL
                        + "' or '"
                        + ClientOptions.CLIENT_HEARTBEAT_TIMEOUT
                        + "'");

        JobID jobID = jobClient.getJobID();
        LOG.info("Begin to report client's heartbeat for the job {}.", jobID);
        ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutor.scheduleAtFixedRate(
                () -> {
                    LOG.info("Report client's heartbeat for the job {}.", jobID);
                    jobClient.reportHeartbeat(System.currentTimeMillis() + timeout);
                },
                interval,
                interval,
                TimeUnit.MILLISECONDS);
        return scheduledExecutor;
    }

这个在standalone模型下,可以在log/flink-root-client-V-SH-101-227.log下看到心跳日志
也就是说,客户端侧没什么问题。现在是jobmanager侧没有接收到心跳或者,接收到心跳还把任务杀掉了

jobmanager杀掉任务源码:

private void checkJobClientAliveness() {
        setClientHeartbeatTimeoutForInitializedJob();
        long currentTimestamp = System.currentTimeMillis();
        Iterator<Map.Entry<JobID, Long>> iterator = jobClientExpiredTimestamp.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<JobID, Long> entry = iterator.next();
            JobID jobID = entry.getKey();
            long expiredTimestamp = entry.getValue();
            if (!getJobManagerRunner(jobID).isPresent()) {
                iterator.remove();
            } else if (expiredTimestamp <= currentTimestamp) {
                log.warn(
                        "The heartbeat from the job client is timeout and cancel the job {}. "
                                + "You can adjust the heartbeat interval "
                                + "by 'client.heartbeat.interval' and the timeout "
                                + "by 'client.heartbeat.timeout'",
                        jobID);
                cancelJob(jobID, webTimeout);
            }
        }
    }
private void setClientHeartbeatTimeoutForInitializedJob() {
    Iterator<Map.Entry<JobID, Long>> iterator =
            uninitializedJobClientHeartbeatTimeout.entrySet().iterator();
    while (iterator.hasNext()) {
        Map.Entry<JobID, Long> entry = iterator.next();
        JobID jobID = entry.getKey();
        Optional<JobManagerRunner> jobManagerRunnerOptional = getJobManagerRunner(jobID);
        if (!jobManagerRunnerOptional.isPresent()) {
            iterator.remove();
        } else if (jobManagerRunnerOptional.get().isInitialized()) {
            jobClientExpiredTimestamp.put(jobID, System.currentTimeMillis() + entry.getValue());
            iterator.remove();
        }
    }
}

通过debug日志,发现每次expiredTimestamp都是一样的,也就是jobClientExpiredTimestamp.put(jobID, System.currentTimeMillis() + entry.getValue());就执行了一次,后面就没再执行

查看initialClientHeartbeatTimeout是-9223372036854775808

private void initJobClientExpiredTime(JobGraph jobGraph) {
        JobID jobID = jobGraph.getJobID();
        //todo 这里需要看下initialClientHeartbeatTimeout是否大于0怀疑这里没执行啊
        long initialClientHeartbeatTimeout = jobGraph.getInitialClientHeartbeatTimeout();
        if (initialClientHeartbeatTimeout > 0) {
            log.info(
                    "Begin to detect the client's aliveness for job {}. The heartbeat timeout is {}",
                    jobID,
                    initialClientHeartbeatTimeout);
            uninitializedJobClientHeartbeatTimeout.put(jobID, initialClientHeartbeatTimeout);

            if (jobClientAlivenessCheck == null) {
                // Use the client heartbeat timeout as the check interval.
                jobClientAlivenessCheck =
                        this.getRpcService()
                                .getScheduledExecutor()
                                .scheduleWithFixedDelay(
                                        () ->
                                                getMainThreadExecutor()
                                                        .execute(this::checkJobClientAliveness),
                                        0L,
                                        jobClientAlivenessCheckInterval,
                                        TimeUnit.MILLISECONDS);
            }
        }
}

继续深入源码:JobGraph.getInitialClientHeartbeatTimeout,PipelineExecutorUtils.getJobGraph

if (configuration.getBoolean(DeploymentOptions.ATTACHED)
                && configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
        jobGraph.setInitialClientHeartbeatTimeout(
        configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT));
}

可以看到,这里只有设置了-sae和ATTACHED才会设置CLIENT_HEARTBEAT_TIMEOUT

解决方案

修改org.apache.flink.runtime.dispatcher.DispatchersetClientHeartbeatTimeoutForInitializedJob方法,去掉iterator.remove();

private void setClientHeartbeatTimeoutForInitializedJob() {
        Iterator<Map.Entry<JobID, Long>> iterator =
                uninitializedJobClientHeartbeatTimeout.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<JobID, Long> entry = iterator.next();
            JobID jobID = entry.getKey();
            Optional<JobManagerRunner> jobManagerRunnerOptional = getJobManagerRunner(jobID);
            if (!jobManagerRunnerOptional.isPresent()) {
                iterator.remove();
            } else if (jobManagerRunnerOptional.get().isInitialized()) {
                jobClientExpiredTimestamp.put(jobID, System.currentTimeMillis() + entry.getValue());
                //iterator.remove();
            }
        }
    }

改方案不会使得while变成死循环。

经过验证,方案可行。

在国内编译真的是各种问题,直接在微软云做编译,顺畅的不行。10分钟搞定,无任何报错

  1. 下载代码:
    git clone git@github.com:gujincheng/flink.git -b digiwin-1.17.2
  2. 编译代码:
    mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=3.0.0-cdh6.2.0 -Dinclude-hadoop -Dscala-2.12 -T20C

    checkpoint一直失败

    编译完flink之后,在对Oracle实时采集的时候,发现Checkpoint一直失败,报错日志如下:
    2024-08-02 14:53:41,454 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Flat Map (1/1) (ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on container_1714272405081_0303_01_000002 @ V-NJ-2-204 (dataPort=13071).
    java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;
    	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.serializeCheckpointBarrier(EventSerializer.java:260) ~[flink-dist-1.17.2.jar:1.17.2]
    	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toSerializedEvent(EventSerializer.java:100) ~[flink-dist-1.17.2.jar:1.17.2]
    	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBufferConsumer(EventSerializer.java:373) ~[flink-dist-1.17.2.jar:1.17.2]
    	at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.broadcastEvent(BufferWritingResultPartition.java:199) ~[flink-dist-1.17.2.jar:1.17.2]
    ...
    2024-08-02 14:53:41,491 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 2 tasks will be restarted to recover the failed task ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
    2024-08-02 14:53:41,491 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job realtime-collect-oracle-kafka Snapshot And Stream (e3b12d95ef0e9746813b713106908662) switched from state RUNNING to RESTARTING.
    2024-08-02 14:53:41,494 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job e3b12d95ef0e9746813b713106908662. (0 consecutive failed attempts so far)
    org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1977) ~[flink-dist-1.17.2.jar:1.17.2]
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.17.2.jar:1.17.2]
    	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1608) ~[flink-dist-1.17.2.jar:1.17.2]
    	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1174) ~[flink-dist-1.17.2.jar:1.17.2]
    	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146) ~[flink-dist-1.17.2.jar:1.17.2]
    ...
    最终定位到,是因为编译flink的时候使用了jdk11,但是,我们cdh里,使用的java是1.8导致checkpoint一直失败

最根本的,是因为flink 1.18不再支持jdk1.8,但是目前我们CDH使用的都是1.8,这会导致flink与我们环境无法兼容