本文主要包括:
- Flink自定义MetricReporter
什么是 Metrics
在 flink 任务运行的过程中,用户通常想知道任务运行的一些基本指标,比如吞吐量、内存和 cpu 使用情况、checkpoint 稳定性等等。而通过 flink metrics 这些指标都可以轻而易举地获取到,避免任务的运行处于黑盒状态,通过分析这些指标,可以更好的调整任务的资源、定位遇到的问题、对任务进行监控。
Flink 对于指标监测有一套自己的实现,同时 flink 自身系统有一些固定的 metric 数据, 包括系统的一些指标,CPU,内存, IO 或者各个 task 运行的一些指标。指标的统计方式有四种:
- Gauge —— 最简单的度量指标,只是简单的返回一个值,比如返回一个队列中当前元素的个数;
- Counter —— 计数器,在一些情况下,会比 Gauge 高效,比如通过一个 AtomicLong 变量来统计一个队列的长度;
- Meter —— 吞吐量的度量,也就是一系列事件发生的速率,例如 TPS;
- Histogram —— 度量值的统计结果,如最大值、最小值、平均值,以及分布情况等。
Metric Reporter
获取 Metrics 有三种方法
- 首先可以在 WebUI 上看到;
- 其次可以通过 RESTful API 获取,RESTful API 对程序比较友好,比如写自动化脚本或程序,自动化运维和测试,通过 RESTful API 解析返回的 Json 格式对程序比较友好;
- 最后,还可以通过 Metric Reporter 获取,监控主要使用 Metric Reporter 功能。
flink 提供了很多外部监控系统的支持:JMX(java 自带的技术,不严格属于第三方)、Graphite、InfluxDB、Prometheus、StatsD、Datadog、Slf4j(直接打 log 里)等,也可以通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口来编写自己的 Reporter。如果想要定期发送报告,可以实现 Scheduled 接口。
开发者可以实现自己的 reporter,将 metrics 数据导出到不同的系统。
- 实现 MetricReporter 类中的 open,close, notifyOfAddedMetric, notifyOfRemovedMetric 方法。
- 实现 Scheduled 的 report 方法,表示其需要被定期调度执行,在该方法中实现写入到其他系统的逻辑。
自定义 Metric Reporter
MetricReporter 是用来向外暴露 Metric 的监测结果的接口。由于 MetricReporter 的子类在实例化时,都是通过反射机制,所以对于其实现子类,需要有一个公共、无参的构造函数,这个接口的定义如下:
public interface MetricReporter {
void open(MetricConfig config);
void close();
void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}
- open – 由于子类都是用无参构造函数,通过反射进行实例化,所以相关初始化的工作都是放在这里进行的,并且这个方法需要在实例化后,就需要调用该方法进行相关初始化的工作;
- close – 这里就是在关闭时,进行资源回收等相关操作的;
- notifyOfAddedMetric – 当有一个新的 Metric 注册时,会调用该方法来通知 MetricReporter;
- notifyOfRemovedMetric – 当有一个 Metric 被移除时,通过这个方法来通知 MetricReporter;
可以参考flink现有的reporter类,核心是3个类
- XXXReporter
- XXXReporterFactory
- XXXReporterOptions
下面参考InfluxdbReporter,开发一个WebServiceReporter,功能是把flink的metris定时发送到webservice,供服务端处理展示:
- WebServiceReporter
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.metrics.webservice; import okhttp3.Authenticator; import okhttp3.Credentials; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.Route; import org.apache.commons.lang3.StringUtils; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.AbstractReporter; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ConcurrentModificationException; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; /** {@link MetricReporter} that exports {@link Metric Metrics} via InfluxDB. */ public class WebServiceReporter extends AbstractReporter implements Scheduled { private static final Logger LOG = LoggerFactory.getLogger(WebServiceReporter.class); public static final MediaType MEDIATYPE = MediaType.get("application/json; charset=utf-8"); private OkHttpClient client; private String url; private String jobName; @Override public void open(MetricConfig config) { url = checkNotNull(WebServiceReporterOptions.getString(config,WebServiceReporterOptions.URL), "Invalid configuration. URL: " + WebServiceReporterOptions.getString(config,WebServiceReporterOptions.URL)); int connectTimeout = WebServiceReporterOptions.getInteger(config, WebServiceReporterOptions.CONNECT_TIMEOUT); int writeTimeout = WebServiceReporterOptions.getInteger(config, WebServiceReporterOptions.WRITE_TIMEOUT); String userName = WebServiceReporterOptions.getString(config, WebServiceReporterOptions.USERNAME); String password = WebServiceReporterOptions.getString(config, WebServiceReporterOptions.PASSWORD); jobName = checkNotNull(WebServiceReporterOptions.getString(config,WebServiceReporterOptions.JOBNAME), "Invalid configuration. URL: " + WebServiceReporterOptions.getString(config,WebServiceReporterOptions.JOBNAME)); client = new OkHttpClient.Builder() .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) .writeTimeout(writeTimeout, TimeUnit.MILLISECONDS) .authenticator(new Authenticator() { @Nullable @Override public Request authenticate( @Nullable Route route, Response response) { if (StringUtils.isNotEmpty(userName) || StringUtils.isNotEmpty(password)) { String credential = Credentials.basic(userName, password); return response.request().newBuilder() .header("Authorization", credential) .build(); } return null; } }).build(); } @Override public void close() { client.dispatcher().executorService().shutdown(); client.connectionPool().evictAll(); } @Override public void report() { Request request = buildReport(); try (Response response = client.newCall(request).execute()) { if (response.isSuccessful()) { String responseString = response.body().string(); LOG.debug("=======> {}" , responseString); } else { LOG.error("#####> report failed: {}", response.message()); } } catch (IOException e) { throw new RuntimeException(e); } } @Nullable private Request buildReport() { String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String reportJson = "{}"; ObjectMapper mapper = new ObjectMapper(); ObjectNode metricMap = mapper.createObjectNode(); metricMap.put("jobName", this.jobName); metricMap.put("timestamp", timestamp); ArrayNode jsonArray = mapper.createArrayNode(); try { gauges.forEach((gauge, metricName) -> { ObjectNode jsonObject = mapper.createObjectNode(); jsonObject.put("metricName", metricName); jsonObject.put("metricValue", mapper.valueToTree(gauge.getValue())); jsonObject.put("metricType", "Gauge"); jsonArray.add(jsonObject); }); counters.forEach((counter, metricName) -> { ObjectNode jsonObject = mapper.createObjectNode(); jsonObject.put("metricName", metricName); jsonObject.put("metricValue", counter.getCount()); jsonObject.put("metricType", "Counter"); jsonArray.add(jsonObject); }); histograms.forEach((histogram, metricName) -> { ObjectNode jsonObject = mapper.createObjectNode(); jsonObject.put("metricName", metricName); jsonObject.put("metricValue", histogram.getCount()); jsonObject.put("metricType", "Histogram"); jsonArray.add(jsonObject); }); meters.forEach((meter, metricName) -> { ObjectNode jsonObject = mapper.createObjectNode(); jsonObject.put("metricName", metricName); jsonObject.put("metricValue", meter.getCount()); jsonObject.put("metricType", "Meter"); jsonArray.add(jsonObject); }); metricMap.put("metrics", jsonArray); reportJson = mapper.writeValueAsString(metricMap); } catch (ConcurrentModificationException | NoSuchElementException | JsonProcessingException e) { // ignore - may happen when metrics are concurrently added or removed // report next time return null; } RequestBody body = RequestBody.create(MEDIATYPE, reportJson); Request request = new Request.Builder() .url(url) .post(body) .build(); return request; } @Override public String filterCharacters(String input) { return input; } }
- WebServiceReporterFactory
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.metrics.webservice; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; import java.util.Properties; /** {@link MetricReporterFactory} for {@link WebServiceReporter}. */ public class WebServiceReporterFactory implements MetricReporterFactory { @Override public MetricReporter createMetricReporter(Properties properties) { return new WebServiceReporter(); } }
- WebServiceReporterOptions
需注意:/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.metrics.webservice; import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.metrics.MetricConfig; /** Config options for {@link WebServiceReporter}. */ @Documentation.SuffixOption(ConfigConstants.METRICS_REPORTER_PREFIX + "webservice") public class WebServiceReporterOptions { public static final ConfigOption<String> URL = ConfigOptions.key("url") .stringType() .noDefaultValue() .withDescription("WebService URL"); public static final ConfigOption<String> USERNAME = ConfigOptions.key("username") .stringType() .noDefaultValue() .withDescription("(optional) WebService username used for authentication"); public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password") .stringType() .noDefaultValue() .withDescription("(optional) WebService username's password used for authentication"); public static final ConfigOption<String> JOBNAME = ConfigOptions.key("jobName") .stringType() .noDefaultValue() .withDescription( "(optional) WebService Flink JobName"); public static final ConfigOption<Integer> CONNECT_TIMEOUT = ConfigOptions.key("connectTimeout") .intType() .defaultValue(3000) .withDescription("(optional) the WebService connect timeout for metrics"); public static final ConfigOption<Integer> WRITE_TIMEOUT = ConfigOptions.key("writeTimeout") .intType() .defaultValue(3000) .withDescription("(optional) the WebService write timeout for metrics"); static String getString(MetricConfig config, ConfigOption<String> key) { return config.getString(key.key(), key.defaultValue()); } static int getInteger(MetricConfig config, ConfigOption<Integer> key) { return config.getInteger(key.key(), key.defaultValue()); } }
需要把resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory
下的内容给改一下
到这里,代码部分就开发完了。剩下的需要把flink源码重新编译一下。如果不想重新编译flink,可以把flink-metric模块单独拿出来,但是需要自己把父模块的依赖加进去# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. org.apache.flink.metrics.webservice.WebServiceReporterFactory
最后需要在flink-conf里配置一下WebServiceReporter:
metrics.reporter.webservice.factory.class: org.apache.flink.metrics.webservice.WebServiceReporterFactory
metrics.reporter.webservice.interval: 10 SECONDS
metrics.reporter.webservice.url: http://172.16.2.205:5000/process_json
metrics.reporter.webservice.jobName: realtime-collect-oracle
也可以把这个配置放到提交任务的脚本里:
bin/flink run -m yarn-cluster -ys 1 -ynm ${jobName} -yjm 1G -ytm 2G -sae -yqu default -sae -p 1 \
-Dmetrics.reporter.webservice.factory.class="org.apache.flink.metrics.webservice.WebServiceReporterFactory" \
-Dmetrics.reporter.webservice.interval="10 SECONDS" \
-Dmetrics.reporter.webservice.url="http://172.16.2.205:5000/process_json" \
-Dmetrics.reporter.webservice.jobName="realtime-collect-oracle" \
-c com.digiwin.ocean.OceanStarter ocean-1.0.jar \
-shost ${shost} \
-sport ${sport} \
-sdb ${sdb} \
-stbn ${stbn} \
-suser ${suser} \
-spsw ${spsw} \
-schema ${schema} \
-dbType ${dbType} \
-sinkType ${sinkType} \
-thost ${thost} \
-tport ${tport} \
-loadPort ${loadPort} \
-tdb ${tdb} \
-tuser ${tuser} \
-isFromCK ${isFromCK} \
-jobName ${jobName} \
-tpsw ${tpsw} \
-ckType ${ckType} \
-env ${env}
实际测试:用这个方式没生效
但是又不想改flink-conf.ymal文件,因为改了这里就是全局的了,肯定不合适,最后采用了在代码里传参的方式:
this.conf = new Configuration();
conf.setString("metrics.reporter.webservice.factory.class","org.apache.flink.metrics.webservice.WebServiceReporterFactory");
conf.setString("metrics.reporter.webservice.interval","10 SECONDS");
conf.setString("metrics.reporter.webservice.url","http://172.16.2.205:5000/process_json");
conf.setString("metrics.reporter.webservice.jobName","realtime-collect-oracle");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
使用metric reporter只能上报yarn的,standalone貌似不行,这里需要再测试一下