0%

Flink自定义MetricReporter

本文主要包括:

  • Flink自定义MetricReporter

什么是 Metrics

在 flink 任务运行的过程中,用户通常想知道任务运行的一些基本指标,比如吞吐量、内存和 cpu 使用情况、checkpoint 稳定性等等。而通过 flink metrics 这些指标都可以轻而易举地获取到,避免任务的运行处于黑盒状态,通过分析这些指标,可以更好的调整任务的资源、定位遇到的问题、对任务进行监控。
Flink 对于指标监测有一套自己的实现,同时 flink 自身系统有一些固定的 metric 数据, 包括系统的一些指标,CPU,内存, IO  或者各个 task 运行的一些指标。指标的统计方式有四种:

  1. Gauge —— 最简单的度量指标,只是简单的返回一个值,比如返回一个队列中当前元素的个数;
  2. Counter —— 计数器,在一些情况下,会比 Gauge 高效,比如通过一个 AtomicLong 变量来统计一个队列的长度;
  3. Meter —— 吞吐量的度量,也就是一系列事件发生的速率,例如 TPS;
  4. Histogram —— 度量值的统计结果,如最大值、最小值、平均值,以及分布情况等。

Metric Reporter

获取 Metrics 有三种方法

  1. 首先可以在 WebUI 上看到;
  2. 其次可以通过 RESTful API 获取,RESTful API 对程序比较友好,比如写自动化脚本或程序,自动化运维和测试,通过 RESTful API 解析返回的 Json 格式对程序比较友好;
  3. 最后,还可以通过 Metric Reporter 获取,监控主要使用 Metric Reporter 功能。

flink 提供了很多外部监控系统的支持:JMX(java 自带的技术,不严格属于第三方)、Graphite、InfluxDB、Prometheus、StatsD、Datadog、Slf4j(直接打 log 里)等,也可以通过实现 org.apache.flink.metrics.reporter.MetricReporter 接口来编写自己的 Reporter。如果想要定期发送报告,可以实现 Scheduled 接口。

开发者可以实现自己的 reporter,将 metrics 数据导出到不同的系统。

  1. 实现 MetricReporter 类中的 open,close, notifyOfAddedMetric, notifyOfRemovedMetric 方法。
  2. 实现 Scheduled 的 report 方法,表示其需要被定期调度执行,在该方法中实现写入到其他系统的逻辑。

自定义 Metric Reporter

MetricReporter 是用来向外暴露 Metric 的监测结果的接口。由于 MetricReporter 的子类在实例化时,都是通过反射机制,所以对于其实现子类,需要有一个公共、无参的构造函数,这个接口的定义如下:

public interface MetricReporter {
   void open(MetricConfig config);
   void close();
   void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
   void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}
  • open – 由于子类都是用无参构造函数,通过反射进行实例化,所以相关初始化的工作都是放在这里进行的,并且这个方法需要在实例化后,就需要调用该方法进行相关初始化的工作;
  • close – 这里就是在关闭时,进行资源回收等相关操作的;
  • notifyOfAddedMetric – 当有一个新的 Metric 注册时,会调用该方法来通知 MetricReporter;
  • notifyOfRemovedMetric – 当有一个 Metric 被移除时,通过这个方法来通知 MetricReporter;

可以参考flink现有的reporter类,核心是3个类

  1. XXXReporter
  2. XXXReporterFactory
  3. XXXReporterOptions

下面参考InfluxdbReporter,开发一个WebServiceReporter,功能是把flink的metris定时发送到webservice,供服务端处理展示:

  1. WebServiceReporter
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.flink.metrics.webservice;
    
    import okhttp3.Authenticator;
    import okhttp3.Credentials;
    import okhttp3.MediaType;
    import okhttp3.OkHttpClient;
    import okhttp3.Request;
    import okhttp3.RequestBody;
    import okhttp3.Response;
    import okhttp3.Route;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.metrics.Metric;
    import org.apache.flink.metrics.MetricConfig;
    import org.apache.flink.metrics.reporter.AbstractReporter;
    import org.apache.flink.metrics.reporter.MetricReporter;
    import org.apache.flink.metrics.reporter.Scheduled;
    
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.annotation.Nullable;
    import java.io.IOException;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.ConcurrentModificationException;
    import java.util.NoSuchElementException;
    import java.util.concurrent.TimeUnit;
    import static org.apache.flink.util.Preconditions.checkNotNull;
    
    /** {@link MetricReporter} that exports {@link Metric Metrics} via InfluxDB. */
    public class WebServiceReporter extends AbstractReporter implements Scheduled {
        private static final Logger LOG = LoggerFactory.getLogger(WebServiceReporter.class);
        public static final MediaType MEDIATYPE = MediaType.get("application/json; charset=utf-8");
        private OkHttpClient client;
        private String url;
        private String jobName;
        @Override
        public void open(MetricConfig config) {
            url = checkNotNull(WebServiceReporterOptions.getString(config,WebServiceReporterOptions.URL),
                    "Invalid configuration. URL: " + WebServiceReporterOptions.getString(config,WebServiceReporterOptions.URL));
            int connectTimeout = WebServiceReporterOptions.getInteger(config, WebServiceReporterOptions.CONNECT_TIMEOUT);
            int writeTimeout = WebServiceReporterOptions.getInteger(config, WebServiceReporterOptions.WRITE_TIMEOUT);
            String userName = WebServiceReporterOptions.getString(config, WebServiceReporterOptions.USERNAME);
            String password = WebServiceReporterOptions.getString(config, WebServiceReporterOptions.PASSWORD);
            jobName = checkNotNull(WebServiceReporterOptions.getString(config,WebServiceReporterOptions.JOBNAME),
                    "Invalid configuration. URL: " + WebServiceReporterOptions.getString(config,WebServiceReporterOptions.JOBNAME));
    
            client = new OkHttpClient.Builder()
                    .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
                    .writeTimeout(writeTimeout, TimeUnit.MILLISECONDS)
                    .authenticator(new Authenticator() {
                        @Nullable
                        @Override
                        public Request authenticate(
                                @Nullable Route route,
                                Response response) {
                            if (StringUtils.isNotEmpty(userName) || StringUtils.isNotEmpty(password)) {
                                String credential = Credentials.basic(userName, password);
                                return response.request().newBuilder()
                                        .header("Authorization", credential)
                                        .build();
                            }
                            return null;
                        }
                    }).build();
        }
    
        @Override
        public void close() {
            client.dispatcher().executorService().shutdown();
            client.connectionPool().evictAll();
        }
    
        @Override
        public void report() {
            Request request = buildReport();
            try (Response response = client.newCall(request).execute()) {
                if (response.isSuccessful()) {
                    String responseString = response.body().string();
                    LOG.debug("=======> {}" , responseString);
                } else {
                    LOG.error("#####> report failed: {}", response.message());
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        @Nullable
        private Request buildReport() {
            String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            String reportJson = "{}";
    
            ObjectMapper mapper = new ObjectMapper();
            ObjectNode metricMap = mapper.createObjectNode();
            metricMap.put("jobName", this.jobName);
            metricMap.put("timestamp", timestamp);
            ArrayNode jsonArray = mapper.createArrayNode();
            try {
                gauges.forEach((gauge, metricName) -> {
                    ObjectNode jsonObject = mapper.createObjectNode();
                    jsonObject.put("metricName", metricName);
                    jsonObject.put("metricValue", mapper.valueToTree(gauge.getValue()));
                    jsonObject.put("metricType", "Gauge");
                    jsonArray.add(jsonObject);
                });
                counters.forEach((counter, metricName) -> {
                    ObjectNode jsonObject = mapper.createObjectNode();
                    jsonObject.put("metricName", metricName);
                    jsonObject.put("metricValue", counter.getCount());
                    jsonObject.put("metricType", "Counter");
                    jsonArray.add(jsonObject);
                });
                histograms.forEach((histogram, metricName) -> {
                    ObjectNode jsonObject = mapper.createObjectNode();
                    jsonObject.put("metricName", metricName);
                    jsonObject.put("metricValue", histogram.getCount());
                    jsonObject.put("metricType", "Histogram");
                    jsonArray.add(jsonObject);
                });
    
                meters.forEach((meter, metricName) -> {
                    ObjectNode jsonObject = mapper.createObjectNode();
                    jsonObject.put("metricName", metricName);
                    jsonObject.put("metricValue", meter.getCount());
                    jsonObject.put("metricType", "Meter");
                    jsonArray.add(jsonObject);
                });
    
                metricMap.put("metrics", jsonArray);
                reportJson = mapper.writeValueAsString(metricMap);
            } catch (ConcurrentModificationException | NoSuchElementException | JsonProcessingException e) {
                // ignore - may happen when metrics are concurrently added or removed
                // report next time
                return null;
            }
    
            RequestBody body = RequestBody.create(MEDIATYPE, reportJson);
            Request request = new Request.Builder()
                    .url(url)
                    .post(body)
                    .build();
            return request;
        }
    
        @Override
        public String filterCharacters(String input) {
            return input;
        }
    }
  2. WebServiceReporterFactory
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.flink.metrics.webservice;
    
    import org.apache.flink.metrics.reporter.MetricReporter;
    import org.apache.flink.metrics.reporter.MetricReporterFactory;
    
    import java.util.Properties;
    
    /** {@link MetricReporterFactory} for {@link WebServiceReporter}. */
    public class WebServiceReporterFactory implements MetricReporterFactory {
    
        @Override
        public MetricReporter createMetricReporter(Properties properties) {
            return new WebServiceReporter();
        }
    }
  3. WebServiceReporterOptions
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.flink.metrics.webservice;
    
    import org.apache.flink.annotation.docs.Documentation;
    import org.apache.flink.configuration.ConfigConstants;
    import org.apache.flink.configuration.ConfigOption;
    import org.apache.flink.configuration.ConfigOptions;
    import org.apache.flink.configuration.ConfigurationUtils;
    import org.apache.flink.configuration.IllegalConfigurationException;
    import org.apache.flink.metrics.MetricConfig;
    
    
    /** Config options for {@link WebServiceReporter}. */
    @Documentation.SuffixOption(ConfigConstants.METRICS_REPORTER_PREFIX + "webservice")
    public class WebServiceReporterOptions {
    
        public static final ConfigOption<String> URL =
                ConfigOptions.key("url")
                        .stringType()
                        .noDefaultValue()
                        .withDescription("WebService URL");
    
        public static final ConfigOption<String> USERNAME =
                ConfigOptions.key("username")
                        .stringType()
                        .noDefaultValue()
                        .withDescription("(optional) WebService username used for authentication");
    
        public static final ConfigOption<String> PASSWORD =
                ConfigOptions.key("password")
                        .stringType()
                        .noDefaultValue()
                        .withDescription("(optional) WebService username's password used for authentication");
        public static final ConfigOption<String> JOBNAME =
                ConfigOptions.key("jobName")
                        .stringType()
                        .noDefaultValue()
                        .withDescription(
                                "(optional) WebService Flink JobName");
    
        public static final ConfigOption<Integer> CONNECT_TIMEOUT =
                ConfigOptions.key("connectTimeout")
                        .intType()
                        .defaultValue(3000)
                        .withDescription("(optional) the WebService connect timeout for metrics");
    
        public static final ConfigOption<Integer> WRITE_TIMEOUT =
                ConfigOptions.key("writeTimeout")
                        .intType()
                        .defaultValue(3000)
                        .withDescription("(optional) the WebService write timeout for metrics");
    
        static String getString(MetricConfig config, ConfigOption<String> key) {
            return config.getString(key.key(), key.defaultValue());
        }
    
        static int getInteger(MetricConfig config, ConfigOption<Integer> key) {
            return config.getInteger(key.key(), key.defaultValue());
        }
    }
    需注意:
    需要把resources/META-INF/services/org.apache.flink.metrics.reporter.MetricReporterFactory下的内容给改一下
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    org.apache.flink.metrics.webservice.WebServiceReporterFactory
    到这里,代码部分就开发完了。剩下的需要把flink源码重新编译一下。如果不想重新编译flink,可以把flink-metric模块单独拿出来,但是需要自己把父模块的依赖加进去

最后需要在flink-conf里配置一下WebServiceReporter:

metrics.reporter.webservice.factory.class: org.apache.flink.metrics.webservice.WebServiceReporterFactory
metrics.reporter.webservice.interval: 10 SECONDS
metrics.reporter.webservice.url: http://172.16.2.205:5000/process_json
metrics.reporter.webservice.jobName: realtime-collect-oracle

也可以把这个配置放到提交任务的脚本里:

bin/flink run -m yarn-cluster -ys 1 -ynm ${jobName} -yjm 1G -ytm 2G -sae -yqu default -sae -p 1 \
    -Dmetrics.reporter.webservice.factory.class="org.apache.flink.metrics.webservice.WebServiceReporterFactory" \
    -Dmetrics.reporter.webservice.interval="10 SECONDS" \
    -Dmetrics.reporter.webservice.url="http://172.16.2.205:5000/process_json" \
    -Dmetrics.reporter.webservice.jobName="realtime-collect-oracle" \
    -c com.digiwin.ocean.OceanStarter ocean-1.0.jar \
    -shost ${shost} \
    -sport ${sport} \
    -sdb ${sdb} \
    -stbn ${stbn} \
    -suser ${suser} \
    -spsw ${spsw} \
    -schema ${schema} \
    -dbType ${dbType} \
    -sinkType ${sinkType} \
    -thost ${thost} \
    -tport ${tport} \
    -loadPort ${loadPort} \
    -tdb ${tdb} \
    -tuser ${tuser} \
    -isFromCK ${isFromCK} \
    -jobName ${jobName} \
    -tpsw ${tpsw} \
    -ckType ${ckType} \
    -env ${env} 

实际测试:用这个方式没生效
但是又不想改flink-conf.ymal文件,因为改了这里就是全局的了,肯定不合适,最后采用了在代码里传参的方式:

this.conf = new Configuration();
conf.setString("metrics.reporter.webservice.factory.class","org.apache.flink.metrics.webservice.WebServiceReporterFactory");
conf.setString("metrics.reporter.webservice.interval","10 SECONDS");
conf.setString("metrics.reporter.webservice.url","http://172.16.2.205:5000/process_json");
conf.setString("metrics.reporter.webservice.jobName","realtime-collect-oracle");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

使用metric reporter只能上报yarn的,standalone貌似不行,这里需要再测试一下