0%

Flink常见问题

本文主要包括:

  • Flink常见问题

Flink常见问题

  • 原因:这是由于Flink提交给YARN之后,YARN后续并没有继续监控Flink任务的状态
  • 解决办法:在提交任务时,命令行加一个参数 -d 即可,例如:run -m yarn-cluster -d -p 2 -yn 2 -yjm 1024m -ytm 2048m -ynm xxxx -c xxxx

这个问题在浔兴变成100%复现了。但是在dolpinscheduler里无法使用-d参数,这里记录一下如何规避:
查看到jobmanager的日志里被以下日志刷屏了:

2024-07-27 17:10:25,702 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
2024-07-27 17:10:55,701 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
2024-07-27 17:11:25,681 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.

猜测和这个有关系:应该是发送客户端心跳出现异常,导致任务虽然失败了,但是客户端接受不到已经异常了
通过debug,最终定位到是因为JobClientHeartbeatRequestBody类序列化出现了问题:
JobClientHeartbeatRequestBody类的构造函数里的参数,需要指定@JsonProperty

public class JobClientHeartbeatRequestBody implements RequestBody {
    private static final String EXPIRED_TIMESTAMP = "expiredTimestamp";

    @JsonProperty(EXPIRED_TIMESTAMP)
    private final long expiredTimestamp;

    @JsonCreator
    public JobClientHeartbeatRequestBody(@JsonProperty(EXPIRED_TIMESTAMP) long expiredTimestamp) {
        this.expiredTimestamp = expiredTimestamp;
    }

    @JsonIgnore
    public long getExpiredTimestamp() {
        return expiredTimestamp;
    }
}

编译完flink之后,在对Oracle实时采集的时候,发现Checkpoint一直失败,报错日志如下:

2024-08-02 14:53:41,454 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Filter -> Flat Map (1/1) (ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on container_1714272405081_0303_01_000002 @ V-NJ-2-204 (dataPort=13071).
java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;
	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.serializeCheckpointBarrier(EventSerializer.java:260) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toSerializedEvent(EventSerializer.java:100) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBufferConsumer(EventSerializer.java:373) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.broadcastEvent(BufferWritingResultPartition.java:199) ~[flink-dist-1.17.2.jar:1.17.2]
...
2024-08-02 14:53:41,491 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 2 tasks will be restarted to recover the failed task ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2024-08-02 14:53:41,491 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job realtime-collect-oracle-kafka Snapshot And Stream (e3b12d95ef0e9746813b713106908662) switched from state RUNNING to RESTARTING.
2024-08-02 14:53:41,494 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job e3b12d95ef0e9746813b713106908662. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1977) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1608) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1174) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146) ~[flink-dist-1.17.2.jar:1.17.2]
...

最终定位到,是因为编译flink的时候使用了jdk11,但是,我们cdh里,使用的java是1.8导致checkpoint一直失败

flink打印中文日志出现乱码

flink on yarn运行,web页面task managers logs和stdout中文显示均乱码(全都问号),解决办法如下:
解决办法:在flink-conf.yaml里添加env.java.opts: "-Dfile.encoding=UTF-8"

java.lang.Exception: Container released on a lost node

  • 原因:YARN队列压力过大或者磁盘满了之后,可能会导致Flink申请的节点标记为失败,导致taskmanager挂掉
  • 解决办法:如果有配置重启策略的话,taskmanager会进行重启,如果没有配置重启策略但是配置了checkpoint,默认的重启策略是无限次重启,但是需要注意一点的是,taskmananger成功重启的前提是jobmanager没有挂掉,如果jobmanager也挂掉了,那么taskmanager重启成功之后也是无效的。

生产上的任务频繁挂掉

一开始以为是问题二导致的,但是实际上问题二配置了checkpoint或者重启策略之后会自己重启,所以证明主要原因并不是以上的问题,其实主要的原因就是当YARN的队列资源紧张的时候,也有可能导致jobmanager挂掉,我们生产上的集群并没有配置Flink On YARN的高可用,即jobmanager挂掉之后是不会进行重启的,所以需要配置Flink On YARN的高可用,配置如下(此配置适用于Flink On YARN的yarn-cluster模式):

  1. 首先配置 yarn-site.xml,配置resourcemanager重启次数
    <property>
      <name>yarn.resourcemanager.am.max-attempts</name>
      <value>4</value>
      <description>
        The maximum number of application master execution attempts.
      </description>
    </property>
  2. 配置flink-conf.yaml ,这里必须添加zookeeper 信息,官方文档yarn-cluster模式只要求添加重启参数,不添加的话,task manager 会和job manager 一起挂掉, 只会重启对应的job manager
    # flink job manager次数  这个参数必须小于yarn.resourcemanager.am.max-attempts 
    yarn.application-attempts: 3
    # 添加zookeeper 配置
    high-availability: zookeeper
    high-availability.zookeeper.quorum: xx.x.x.xxx:2181
    # job manager元数据在文件系统储存的位置
    high-availability.storageDir: hdfs:///flink/recovery  

    此配置只能降低Flink的失败次数,如果想让Flink稳定运行,应该还是需要在YARN上单独划分一个队列给实时任务使用,避免因其他因素导致实时任务失败

提交任务到yarn上,一直报Retrying connect to server: 0.0.0.0/0.0.0.0:8032

公司服务迁移,把ds迁移到新节点,导致提交flink任务一直报以下日志:

[TID: N/A] [INFO] 2024-10-24 09:08:35.435  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:34,995 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:36.436  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:35,995 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:37.437  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:36,996 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:38.438  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:37,996 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:39.439  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:38,997 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:40.440  - [taskAppId=TASK-32790-106785-178565]:[763] -  -> 2024-10-24 09:08:39,997 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)

首先,新节点不在hadoop的实例内。属于单独的client像hadoop集群提交任务
这种情况需要在该节点安装一个hadoop的客户端,并且配置文件和源集群配置一样
通过检查,发现新节点确实安装了hadoop的客户端,并且,hadoop classpath也已经配置了。但是,yarn-site.xml和源集群的不一样。
解决方法:把配置文件全部替换成源集群的配置文件

Flink On Yarn提交任务后,状态一直是CREATE,执行几分钟后,报错:

Could not allocate the required slot within slot request timeout.

错误原因
首先检查是否集群资源不够的问题,然后再检查是否是打包的问题;

我这边出现错误的原因是因为我的POM配置文件配置有误;

POM文件有问题,漏写了几个provided,导致有些Flink相关的包也给打包进去了;

解决办法
将漏写的 provided 补上,重新打包运行;

在main函数里定义的info日志,在idea里可以正常打印,但是,flink on yarn的方式,就不打印了。
通过测试,flink on standalone模式,日志会打印在client.log里。但是flink onyarn模式,因为提交任务到yarn上之后,client也不会在本机执行,所以日志不会在提交命令行展示,但是具体展示也没找到地方
下面是通过配置log4j2的方式集成日志的尝试:

  1. pom文件
    <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>2.4.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>2.4.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.4.1</version>
            </dependency>
    
            <!-- 需要把slf4j-log4j12的依赖去除,有冲突 -->
            <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
            </dependency>
  2. log4j2配置文件
    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="INFO">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
                <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
                <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n" />
            </Console>
    
        </Appenders>
        <Loggers>
            <Root level="info">
                <AppenderRef ref="Console" />
            </Root>
        </Loggers>
    </Configuration>
    可以参考记一次log4j不打印日志的踩坑记

经过实际测试,flink lib下的log4j的jar不需要动。这种方式,通过standalone方式,可以在client.log里看到日志。但是提交到yarn上之后,所有日志都没有了,连tm和jm的日志都没有
怀疑是log4j2的问题。因为原本的项目使用的是log4j.properties

注意:

  1. 在 Flink 的 Application 模式下,入口类的所有日志(无论是 log 输出还是 println 打印)都会在 JobManager 容器内呈现。log 输出会打印到 jobmanager.log,println 输出会打印到 jobmanager.out。
  2. Flink 的 JobGraph 在客户端生成,而入口类的一些逻辑在客户端执行,并且是在生成 JobGraph 之前执行。在 yarn-per-job 模式下,客户端在本地运行,而在 Application 模式下,客户端由 YARN 上的 JobManager 执行。因此,日志打印原本由客户端处理,现在都由 JobManager 处理。

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

执行flink程序,以yarn-per-job的方式可以正常执行,但是,以yarn-applicaiton模式执行,就报以下错误:

2025-01-15 13:41:51,841 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: mysql-source -> Filter -> mysql-source -> kafka-sink: Writer -> kafka-sink: Committer (1/1)#0 (f67c466ac29575361b3119aa19e43f6b_d7957fe5bd3c3e0b4a50d2d3a7e3e1ac_0_0) switched from INITIALIZING to FAILED with failure cause:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439) ~[ocean-1.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[ocean-1.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[ocean-1.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[ocean-1.0.jar:?]
	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55) ~[ocean-1.0.jar:?]
	at org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:182) ~[ocean-1.0.jar:?]
	at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111) ~[ocean-1.0.jar:?]
	at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57) ~[ocean-1.0.jar:?]
	at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) [flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.2.jar:1.17.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.2.jar:1.17.2]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_411]
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[ocean-1.0.jar:?]
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[ocean-1.0.jar:?]
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[ocean-1.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[ocean-1.0.jar:?]
	... 21 more

网上的说法是jar包冲突,通过检查,确实发现flink-cdc引用的kafka与flink-connector-kafka引用的kafka版本不一样
这里我修改了pom文件的kafka冲突,但是没有效果,感觉上,像是集群里有其他kafka相关的jar包。
设置classloader.resolve-order: parent-first 可以解决这个问题
但是,设置这个参数之后可能会有一些隐患,一般是不建议设置的。具体可以参考双亲委派模型与 Flink 的类加载策略
通过查看flink官方文档,发现也可以设置classloader.parent-first-patterns.additional: "org.apache.kafka.";
但是这个也是治标不治本,还是需要看下集群里到底有什么kakfa的包影响了程序

Exception in thread “main” java.util.ServiceConfigurationError: java.net.spi.URLStreamHandlerProvider: Provider org.infinispan.commons.jdkspecific.ClasspathURLStreamHandlerProvider not found

在使用flink local模式(java -cp)的方式提交flink程序的时候,在idea里能够正常运行,但是打成包使用java -cp就报这个错:

Exception in thread "main" java.util.ServiceConfigurationError: java.net.spi.URLStreamHandlerProvider: Provider org.infinispan.commons.jdkspecific.ClasspathURLStreamHandlerProvider not found
	at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:589)
	at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1212)
...
	at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:305)
	at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:177)
	at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:552)
	at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:519)
	at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:463)
	at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:77)
	at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2206)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2187)
	at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1471)
	at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1336)
	at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1322)
	at com.digiwin.ocean.CheckpointUtil.parseMysqlState(CheckpointUtil.java:94)
	at com.digiwin.ocean.CheckpointUtil.checkCheckpointStatus(CheckpointUtil.java:383)
	at com.digiwin.ocean.CheckpointUtil.main(CheckpointUtil.java:428)

原因: Java的版本问题,在pom文件里,指定了java为1.8,idea默认也是用的1.8来执行,但是,配置的java_home使用的却是jdk11
解决方案: 切换java版本

使用createLocalEnvironment读取hdfs文件,报错Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1

背景
使用java -jar的方式启动flink程序,用以读取flink checkpoint的数据,在datanode节点部署服务,可以正常读取hdfs内容,但是,放到非datanode节点,仅是hadoop客户端的服务器上,就报错:

Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://nameservice1/flink/checkpoint/ocean/realtime-collect-mysql/da5c280856b3a9e15573c43f6f0ffb5a/chk-5
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:196)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:526)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
        at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:257)
        ... 58 common frames omitted
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
        at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:444)
        at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:132)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:355)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:289)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:163)
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:168)
        ... 62 common frames omitted
Caused by: java.net.UnknownHostException: nameservice1
        ... 68 common frames omitted

一开始以为是没有加载hadoop相关配置。但是经过一系列尝试,发现使用StreamExecutionEnvironment.createLocalEnvironment()没办法传入hadoop相关配置,最后发现,datanode节点和hadoop客户端的差异在于,datanode上配置了hadoop_home以及hadoop_classpath
通过在hadoop客户端配置这两个环境变量,问题解决
解决方案:配置hadoop_home以及hadoop_classpath
深思: 之后再使用flink只要与hadoop相关的,这两个配置就必不可少