本文主要包括:
- Flink常见问题
Flink常见问题
Flink On YARN中任务挂掉后,YARN的Web UI显示还在运行,但实际上已经挂掉了
- 原因:这是由于Flink提交给YARN之后,YARN后续并没有继续监控Flink任务的状态
- 解决办法:在提交任务时,命令行加一个参数 -d 即可,例如:run -m yarn-cluster -d -p 2 -yn 2 -yjm 1024m -ytm 2048m -ynm xxxx -c xxxx
这个问题在浔兴变成100%复现了。但是在dolpinscheduler里无法使用-d参数,这里记录一下如何规避:
查看到jobmanager的日志里被以下日志刷屏了:
2024-07-27 17:10:25,702 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
2024-07-27 17:10:55,701 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
2024-07-27 17:11:25,681 ERROR org.apache.flink.runtime.rest.handler.job.JobClientHeartbeatHandler [] - Exception occurred in REST handler: Request did not match expected format JobClientHeartbeatRequestBody.
猜测和这个有关系:应该是发送客户端心跳出现异常,导致任务虽然失败了,但是客户端接受不到已经异常了
通过debug,最终定位到是因为JobClientHeartbeatRequestBody类序列化出现了问题:
JobClientHeartbeatRequestBody类的构造函数里的参数,需要指定@JsonProperty
public class JobClientHeartbeatRequestBody implements RequestBody {
private static final String EXPIRED_TIMESTAMP = "expiredTimestamp";
@JsonProperty(EXPIRED_TIMESTAMP)
private final long expiredTimestamp;
@JsonCreator
public JobClientHeartbeatRequestBody(@JsonProperty(EXPIRED_TIMESTAMP) long expiredTimestamp) {
this.expiredTimestamp = expiredTimestamp;
}
@JsonIgnore
public long getExpiredTimestamp() {
return expiredTimestamp;
}
}
编译完flink之后,在对Oracle实时采集的时候,发现Checkpoint一直失败,报错日志如下:
2024-08-02 14:53:41,454 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Filter -> Flat Map (1/1) (ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on container_1714272405081_0303_01_000002 @ V-NJ-2-204 (dataPort=13071).
java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;
at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.serializeCheckpointBarrier(EventSerializer.java:260) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toSerializedEvent(EventSerializer.java:100) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBufferConsumer(EventSerializer.java:373) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.broadcastEvent(BufferWritingResultPartition.java:199) ~[flink-dist-1.17.2.jar:1.17.2]
...
2024-08-02 14:53:41,491 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 2 tasks will be restarted to recover the failed task ed1ebdb51de67d14b5a760cf0977632a_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2024-08-02 14:53:41,491 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job realtime-collect-oracle-kafka Snapshot And Stream (e3b12d95ef0e9746813b713106908662) switched from state RUNNING to RESTARTING.
2024-08-02 14:53:41,494 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job e3b12d95ef0e9746813b713106908662. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1977) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1608) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1174) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146) ~[flink-dist-1.17.2.jar:1.17.2]
...
最终定位到,是因为编译flink的时候使用了jdk11,但是,我们cdh里,使用的java是1.8导致checkpoint一直失败
flink打印中文日志出现乱码
flink on yarn运行,web页面task managers logs和stdout中文显示均乱码(全都问号),解决办法如下:
解决办法:在flink-conf.yaml
里添加env.java.opts: "-Dfile.encoding=UTF-8"
java.lang.Exception: Container released on a lost node
- 原因:YARN队列压力过大或者磁盘满了之后,可能会导致Flink申请的节点标记为失败,导致taskmanager挂掉
- 解决办法:如果有配置重启策略的话,taskmanager会进行重启,如果没有配置重启策略但是配置了checkpoint,默认的重启策略是无限次重启,但是需要注意一点的是,taskmananger成功重启的前提是jobmanager没有挂掉,如果jobmanager也挂掉了,那么taskmanager重启成功之后也是无效的。
生产上的任务频繁挂掉
一开始以为是问题二导致的,但是实际上问题二配置了checkpoint或者重启策略之后会自己重启,所以证明主要原因并不是以上的问题,其实主要的原因就是当YARN的队列资源紧张的时候,也有可能导致jobmanager挂掉,我们生产上的集群并没有配置Flink On YARN的高可用,即jobmanager挂掉之后是不会进行重启的,所以需要配置Flink On YARN的高可用,配置如下(此配置适用于Flink On YARN的yarn-cluster模式):
- 首先配置 yarn-site.xml,配置resourcemanager重启次数
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. </description> </property>
- 配置flink-conf.yaml ,这里必须添加zookeeper 信息,官方文档yarn-cluster模式只要求添加重启参数,不添加的话,task manager 会和job manager 一起挂掉, 只会重启对应的job manager
# flink job manager次数 这个参数必须小于yarn.resourcemanager.am.max-attempts yarn.application-attempts: 3 # 添加zookeeper 配置 high-availability: zookeeper high-availability.zookeeper.quorum: xx.x.x.xxx:2181 # job manager元数据在文件系统储存的位置 high-availability.storageDir: hdfs:///flink/recovery
此配置只能降低Flink的失败次数,如果想让Flink稳定运行,应该还是需要在YARN上单独划分一个队列给实时任务使用,避免因其他因素导致实时任务失败
提交任务到yarn上,一直报Retrying connect to server: 0.0.0.0/0.0.0.0:8032
公司服务迁移,把ds迁移到新节点,导致提交flink任务一直报以下日志:
[TID: N/A] [INFO] 2024-10-24 09:08:35.435 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:34,995 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:36.436 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:35,995 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:37.437 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:36,996 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:38.438 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:37,996 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:39.439 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:38,997 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
[TID: N/A] [INFO] 2024-10-24 09:08:40.440 - [taskAppId=TASK-32790-106785-178565]:[763] - -> 2024-10-24 09:08:39,997 INFO org.apache.hadoop.ipc.Client [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
首先,新节点不在hadoop的实例内。属于单独的client像hadoop集群提交任务
这种情况需要在该节点安装一个hadoop的客户端,并且配置文件和源集群配置一样
通过检查,发现新节点确实安装了hadoop的客户端,并且,hadoop classpath也已经配置了。但是,yarn-site.xml和源集群的不一样。
解决方法:把配置文件全部替换成源集群的配置文件
Flink On Yarn提交任务后,状态一直是CREATE
Flink On Yarn提交任务后,状态一直是CREATE,执行几分钟后,报错:
Could not allocate the required slot within slot request timeout.
错误原因
首先检查是否集群资源不够的问题,然后再检查是否是打包的问题;
我这边出现错误的原因是因为我的POM配置文件配置有误;
POM文件有问题,漏写了几个
解决办法
将漏写的
Flink on yarn在main函数里定义的info日志,打印不出来
在main函数里定义的info日志,在idea里可以正常打印,但是,flink on yarn的方式,就不打印了。
通过测试,flink on standalone模式,日志会打印在client.log里。但是flink onyarn模式,因为提交任务到yarn上之后,client也不会在本机执行,所以日志不会在提交命令行展示,但是具体展示也没找到地方
下面是通过配置log4j2的方式集成日志的尝试:
- pom文件
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.4.1</version> </dependency> <!-- 需要把slf4j-log4j12的依赖去除,有冲突 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>
- log4j2配置文件
可以参考记一次log4j不打印日志的踩坑记<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n" /> </Console> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Console" /> </Root> </Loggers> </Configuration>
经过实际测试,flink lib下的log4j的jar不需要动。这种方式,通过standalone方式,可以在client.log里看到日志。但是提交到yarn上之后,所有日志都没有了,连tm和jm的日志都没有
怀疑是log4j2的问题。因为原本的项目使用的是log4j.properties
注意:
- 在 Flink 的 Application 模式下,入口类的所有日志(无论是 log 输出还是 println 打印)都会在 JobManager 容器内呈现。log 输出会打印到 jobmanager.log,println 输出会打印到 jobmanager.out。
- Flink 的 JobGraph 在客户端生成,而入口类的一些逻辑在客户端执行,并且是在生成 JobGraph 之前执行。在 yarn-per-job 模式下,客户端在本地运行,而在 Application 模式下,客户端由 YARN 上的 JobManager 执行。因此,日志打印原本由客户端处理,现在都由 JobManager 处理。
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
执行flink程序,以yarn-per-job的方式可以正常执行,但是,以yarn-applicaiton模式执行,就报以下错误:
2025-01-15 13:41:51,841 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: mysql-source -> Filter -> mysql-source -> kafka-sink: Writer -> kafka-sink: Committer (1/1)#0 (f67c466ac29575361b3119aa19e43f6b_d7957fe5bd3c3e0b4a50d2d3a7e3e1ac_0_0) switched from INITIALIZING to FAILED with failure cause:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439) ~[ocean-1.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[ocean-1.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[ocean-1.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[ocean-1.0.jar:?]
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55) ~[ocean-1.0.jar:?]
at org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:182) ~[ocean-1.0.jar:?]
at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111) ~[ocean-1.0.jar:?]
at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57) ~[ocean-1.0.jar:?]
at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) [flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.2.jar:1.17.2]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_411]
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[ocean-1.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[ocean-1.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[ocean-1.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[ocean-1.0.jar:?]
... 21 more
网上的说法是jar包冲突,通过检查,确实发现flink-cdc引用的kafka与flink-connector-kafka引用的kafka版本不一样
这里我修改了pom文件的kafka冲突,但是没有效果,感觉上,像是集群里有其他kafka相关的jar包。
设置classloader.resolve-order: parent-first
可以解决这个问题
但是,设置这个参数之后可能会有一些隐患,一般是不建议设置的。具体可以参考双亲委派模型与 Flink 的类加载策略
通过查看flink官方文档,发现也可以设置classloader.parent-first-patterns.additional: "org.apache.kafka.";
但是这个也是治标不治本,还是需要看下集群里到底有什么kakfa的包影响了程序
Exception in thread “main” java.util.ServiceConfigurationError: java.net.spi.URLStreamHandlerProvider: Provider org.infinispan.commons.jdkspecific.ClasspathURLStreamHandlerProvider not found
在使用flink local模式(java -cp)的方式提交flink程序的时候,在idea里能够正常运行,但是打成包使用java -cp就报这个错:
Exception in thread "main" java.util.ServiceConfigurationError: java.net.spi.URLStreamHandlerProvider: Provider org.infinispan.commons.jdkspecific.ClasspathURLStreamHandlerProvider not found
at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:589)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1212)
...
at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:305)
at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:177)
at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:552)
at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:519)
at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:463)
at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:77)
at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2206)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2187)
at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1471)
at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1336)
at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1322)
at com.digiwin.ocean.CheckpointUtil.parseMysqlState(CheckpointUtil.java:94)
at com.digiwin.ocean.CheckpointUtil.checkCheckpointStatus(CheckpointUtil.java:383)
at com.digiwin.ocean.CheckpointUtil.main(CheckpointUtil.java:428)
原因: Java的版本问题,在pom文件里,指定了java为1.8,idea默认也是用的1.8来执行,但是,配置的java_home使用的却是jdk11
解决方案: 切换java版本
使用createLocalEnvironment
读取hdfs文件,报错Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
背景
使用java -jar
的方式启动flink程序,用以读取flink checkpoint的数据,在datanode节点部署服务,可以正常读取hdfs内容,但是,放到非datanode节点,仅是hadoop客户端的服务器上,就报错:
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://nameservice1/flink/checkpoint/ocean/realtime-collect-mysql/da5c280856b3a9e15573c43f6f0ffb5a/chk-5
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:196)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:526)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:257)
... 58 common frames omitted
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:444)
at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:132)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:355)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:289)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:163)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:168)
... 62 common frames omitted
Caused by: java.net.UnknownHostException: nameservice1
... 68 common frames omitted
一开始以为是没有加载hadoop相关配置。但是经过一系列尝试,发现使用StreamExecutionEnvironment.createLocalEnvironment()
没办法传入hadoop相关配置,最后发现,datanode节点和hadoop客户端的差异在于,datanode上配置了hadoop_home以及hadoop_classpath
通过在hadoop客户端配置这两个环境变量,问题解决
解决方案:配置hadoop_home以及hadoop_classpath
深思: 之后再使用flink只要与hadoop相关的,这两个配置就必不可少