0%

Flink使用案例(一)

本文主要包括:

  • 实时订单模型实现

实时订单模型实现

公司最近要重构订单模型,需求是,把订单模型从离线完全转为实时,但是,需要解决如下几点问题:

  • 底层的订单表分为3张表,最终的模型是把3个表关联成一张表
  • 底层三个表的数据不是同时产生的,时间跨度有大有小,导致时间窗口不好控制
  • 订单状态随时间变化,导致数据肯定会跨天,甚至有的订单状态变成完成状态需要30天以上
  • 某个时间点,可能只会有一个表的数据更新,就会导致在处理这条数据的时候,肯定关联不上另外两个表

为了解决如上问题,有如下几个思路解决:

  • 使用Flink的状态计算
  • 把订单的3个表先缓存下来,每条数据来之后,先去缓存重获取另外两个表的数据

考虑到,订单的生命周期太长,一个订单从产生到结束,时间跨度大部分在2周之内,但是在大促期间,有的甚至能跨好几个月,这时候用状态计算来保留订单的状态就有些不恰当了。
所以最终考虑使用Hbase作为缓存组件,先将3个订单表缓存到Hbase中

数据源样例:

数据源分为t_bdeal(fbdeal_id),t_deal(fdeal_id),t_trade(ftrade_id)表,其中,t_deal中含有fbdeal_id,t_trade表中含有fbdeal_id和fdeal_id
数据关系如下图:
数据关系图

所以设置Hbase 3个表的rowkey分别是每个表的主键,并在t_bdeal表中添加${fdeal_id}_${ftrade_id}的集合
数据样例如下:
bdeal缓存数据样例如下:
bdeal缓存数据样例
deal缓存数据样例如下:
bdeal缓存数据样例
trade缓存数据样例如下:
bdeal缓存数据样例

设计方案流程图如下:

实时订单模型设计方案

代码实现

这里只贴关键部分的代码。
数据流都在transform中实现,这里只贴transform的代码:

package com.haiziwang.streaming.executor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import com.haiziwang.streaming.common.base.AbstractStreamExcutor;
import com.haiziwang.streaming.common.base.AbstractStreamSink;
import com.haiziwang.streaming.common.util.ResourcesUtil;
import com.haiziwang.streaming.constant.UserTagConstant;
import com.haiziwang.streaming.sink.OmsOrderBdealSink;
import com.haiziwang.streaming.util.DateUtil;
import com.haiziwang.streaming.util.GdmFactOmsOrdersUtil;
import com.haiziwang.streaming.util.HBaseUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * Author:   gujc
 * Date:     2021/03/03 15:38
 * Description:
 * rowkey设计:
 * bdeal: Fbdeal_id
 * deal: Fdeal_id
 * trade:Ftrade_id
 * 1、Hbase的bdeal表里存deal_id和trade_id的集合(deal_id_set,trade_id_set)
 * 2、之后trade表和bdeal表数据更新,如果是新增,那么也会更新bdeal表数据
 * 3、可能需要在窗口里对数据根据update_time排序,取最新的数据
 * 4、需要考虑一下,如果两个流数据同时过来,应该先更新哪个的问题
 * 5、如果数据更新方式是delete,需要过滤掉吗?
 *
 *
 * 重要未做:
 *      订单表的预分区,你可以找杜鹏这边了解一下采集系统的做法
 *      需要考虑在查询hbase的同时,另外一个并发在更新同一条记录,这里暂时无解,可能需要用到锁
 *      3个表的每条记录都会更新一次kudu,
 */
public class GdmFactOmsOrdersExecutor extends AbstractStreamExcutor implements Serializable {
    /**
     * OMS订单表
     */
    private static final String T_DEAL = "t_deal";
    private static final String T_BDEAL = "t_bdeal";
    private static final String T_TRADE = "t_trade";
    private static final String BDEAL_MATCHES = "t_bdeal_[0-9]+$";
    private static final String DEAL_MATCHES = "t_deal_[0-9]+$";
    private static final String TRADE_MATCHES = "t_trade_[0-9]+$";
    private static final String HBASE_TABLE_TRADE = "hb_app_onedata_oms_orders_t_trade_da";
    private static final String HBASE_TABLE_DEAL = "hb_app_onedata_oms_orders_t_deal_da";
    private static final String HBASE_TABLE_BDEAL = "hb_app_onedata_oms_orders_t_bdeal_da";



    @Override
    public AbstractStreamSink getFlinkSink() {
        return new OmsOrderBdealSink();
    }

    @Override
    public DataStream<Object> transform(DataStream<String> dataStream) {
        SingleOutputStreamOperator<Object> singleStream = dataStream.map(new MapFunction<String, Tuple3<String, String,String>>() {
            @Override
            public Tuple3<String, String,String> map(String value) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                String data = jsonObject.getString("data");
                String table = jsonObject.getString("table");
                String type = jsonObject.getString("type");

                return new Tuple3<String, String,String>(table, data,type);
            }
        }).filter(new FilterFunction<Tuple3<String, String,String>>() {
            @Override
            public boolean filter(Tuple3<String, String,String> value) throws Exception {
                String optType = value.f2.toLowerCase();
                if(null != value.f1 && !value.f1.isEmpty() && !"delete".equals(optType)){
                    return true;
                }

                return false;
            }
        }).flatMap(new FlatMapFunction<Tuple3<String, String,String>,Tuple2<String, JSONObject>>() {
            @Override
            public void flatMap(Tuple3<String, String,String> value, Collector<Tuple2<String, JSONObject>> out) throws Exception {
                List<JSONObject> jsonObjects = JSON.parseArray(value.f1, JSONObject.class);
                String tableName = "";
                if(Pattern.matches(BDEAL_MATCHES, value.f0)){
                    tableName = UserTagConstant.T_BDEAL;
                } else if(Pattern.matches(DEAL_MATCHES, value.f0)){
                    tableName = UserTagConstant.T_DEAL;
                } else if(Pattern.matches(TRADE_MATCHES, value.f0)){
                    tableName = UserTagConstant.T_TRADE;
                }
                if(!StringUtils.isNullOrWhitespaceOnly(tableName)){
                    for (JSONObject jsonObject : jsonObjects) {
                        out.collect(new Tuple2<>(tableName.toLowerCase(),jsonObject));
                    }
                }
            }
        })
        .flatMap(new RichFlatMapFunction<Tuple2<String, JSONObject>, Object>() {
            private HBaseUtil hBaseUtil;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                hBaseUtil = HBaseUtil.getInstance(ResourcesUtil.getValue("conf", "zookeeper"));
            }

            private JSONArray mergeProp(JSONArray jsonArray, String prop) {
                if (jsonArray == null) {
                    jsonArray = new JSONArray();
                }
                if (!jsonArray.contains(prop)) {
                    jsonArray.add(prop);
                }
                return jsonArray;
            }


            /**
             * 更新Hbase表数据
             * 如果数据已经存在,判断update时间是否在历史数据之后,是则更新
             * 数据需要清洗后才能入Hbase
             * Hbase数据结构:
             * rowkey -> Map<orderData,updateTime>
             * 如果是trade数据,那么还需要再更新一下bdeal数据,为了存储tradeArray
             * @param data
             * @return updateRes,更新true,未更新false
             */
            private Boolean updateHbase(JSONObject data,String tableName) throws IOException, ParseException {
                String updateTime = "";
                String cacheTableName = "";
                String rowkey = "";
                String updateTimeCache = "";
                JSONArray tradeArray = null;
                Map<String, String> cacheMap = Maps.newHashMap();
                if(T_DEAL.equals(tableName)){
                    updateTime = data.getString("Flast_update_time");
                    cacheTableName = HBASE_TABLE_DEAL;
                    rowkey = data.getString("Fdeal_id");
                    cacheMap.put("ext_data",data.getString("Fdeal_ext_data"));
                } else if(T_BDEAL.equals(tableName)){
                    updateTime = data.getString("Fbdeal_update_time");
                    cacheTableName = HBASE_TABLE_BDEAL;
                    rowkey = data.getString("Fbdeal_id");
                    tradeArray = JSONArray.parseArray(data.getString("tradeArray"));
                } else if(T_TRADE.equals(tableName)){
                    updateTime = data.getString("Ftrade_update_time");
                    cacheTableName = HBASE_TABLE_TRADE;
                    rowkey = data.getString("Ftrade_id");

                    //更新fbdeal缓存的tradeArray
                    String bdealId = data.getString("Fbdeal_id");
                    String dealId = data.getString("Fdeal_id");
                    cacheMap.put("ext_data",data.getString("Ftrade_ext_data"));
                    Map<String, String> bdealDataMap = hBaseUtil.getRow(HBASE_TABLE_BDEAL,bdealId);
                    JSONArray tradeArrayTemp = null == bdealDataMap.get("tradeArray") ?
                            new JSONArray() : JSONArray.parseArray(bdealDataMap.get("tradeArray"));

                    tradeArrayTemp = mergeProp(tradeArrayTemp,dealId + "_" + rowkey);
                    if(bdealDataMap.size() == 0){
                        //如果bdeal还没有数据,不加updateTime,为了让bdeal数据来了以后,能够更新数据
                        bdealDataMap.put("Fbdeal_id",bdealId);
                        bdealDataMap.put("tradeArray",tradeArrayTemp.toJSONString());
                    } else {
                        //如果bdeal已经有数据
                        bdealDataMap.put("tradeArray",tradeArrayTemp.toJSONString());
                    }
                    hBaseUtil.insertOrderData(HBASE_TABLE_BDEAL, bdealId,bdealDataMap);
                } else {
                    return false;
                }

                updateTimeCache = hBaseUtil.getRow(cacheTableName,rowkey).get("updateTime");
                //如果更新时间大于缓存的时间,则更新数据,否则不更新
                if(StringUtils.isNullOrWhitespaceOnly(updateTimeCache) ||  DateUtil.dateDiffMilliSecond(updateTimeCache,updateTime) > 0){
                    if(null != tradeArray){
                        data.put("tradeArray",tradeArray.toJSONString());
                    }
                    cacheMap.put("orderData",data.toJSONString());
                    cacheMap.put("updateTime",updateTime);
                    hBaseUtil.insertOrderData(cacheTableName, rowkey, cacheMap);

                    return true;
                } else if (DateUtil.dateDiffMilliSecond(updateTimeCache,updateTime) == 0){
                    //如果这个数据已经在hbase里存在了,那么不更新hbase,但是需要处理后面的逻辑
                    return true;
                } else {
                    return false;
                }
            }


            /**
             * 主逻辑:
             * updateHbase返回值,更新了数据则继续,否则结束
             * 按照3个订单表的不同,处理逻辑不同:
             *
             * 1. 交易单执行逻辑(bdeal)
             * 1.1 根据tradeArray获取Hbase中trade的数据
             * 1.2 从trade表中获取Fdeal_id,以此获取Hbase中deal的数据
             * 1.3 拼接结果数据,并返回
             *
             * 2. 商品单数据更新处理逻辑(trade)
             * 2.1. 根据Fdeal_id获取Hbase中deal的数据
             * 2.2. 根据Fbdeal_id获取Hbase中bdeal中的数据,并拼接tradeArray,写入bdeal表
             * 2.3. 拼接结果数据,并返回
             *
             * 3. 包裹单数据更新处理逻辑(deal)
             * 3.1 根据Fbdeal_id获取Hbase中bdeal的数据,
             * 3.2 并根据bdeal表的tradeArray获取trade表数据
             * 3.3 拼接结果数据,并返回
             *
             * 需要解决一个问题:
             * 如果trade数据先进来,只会更新trade缓存。那么等到bdeal数据来之后,因为没有tradeArray,数据就没法处理了
             * 所以,在更新trade缓存的时候,也需要更新bdeal的缓存。
             * @param data
             */
            private ArrayList<JSONObject> handle(JSONObject data,String tableName) throws IOException, ParseException {
                ArrayList<JSONObject> resArr = new ArrayList();
                boolean flag = updateHbase(data,tableName);

                if(flag){
                    if(T_BDEAL.equals(tableName)){
                        String fbdealId = data.getString("Fbdeal_id");
                        //根据Fbdeal_id获取Hbase中bdeal的数据('data',jsonObject)
                        Map<String,String> bdealCacheData = hBaseUtil.getRow(HBASE_TABLE_BDEAL,fbdealId);
                        JSONObject bdealOrderCacheData = null;
                        JSONObject dealOrderCacheData = null;
                        JSONObject tradeOrderCacheData =  null;
                        try{
                            bdealOrderCacheData =  JSONObject.parseObject(bdealCacheData.get("orderData"));
                        } catch (Exception e){
                            System.out.println(bdealCacheData.get("orderData") + "=========== bdeal:bdeal:" + fbdealId);
                            System.exit(1);
                        }
                        JSONArray tradeArray = JSONArray.parseArray(bdealCacheData.get("tradeArray"));
                        //根据trade_id获取trade和deal表信息
                        if(null != tradeArray){
                            for(int i = 0,size = tradeArray.size(); i < size; i++){
                                String dealId = tradeArray.getString(i).split("_")[0];
                                String tradeId = tradeArray.getString(i).split("_")[1];
                                String orderDataStr = hBaseUtil.getRow(HBASE_TABLE_TRADE,tradeId).get("orderData");
                                String tradeExtData = hBaseUtil.getRow(HBASE_TABLE_TRADE,tradeId).getOrDefault("ext_data","{}");

                                try{
                                    tradeOrderCacheData =  JSONObject.parseObject(orderDataStr);
                                } catch (Exception e){
                                    System.out.println(orderDataStr + "=========== bdeal:trade:" + tradeId);
                                    System.exit(1);
                                }

                                String dealOrderStr = hBaseUtil.getRow(HBASE_TABLE_DEAL,dealId).get("orderData");
                                String dealExtData = hBaseUtil.getRow(HBASE_TABLE_DEAL,dealId).getOrDefault("ext_data","{}");

                                try{
                                    dealOrderCacheData = JSONObject.parseObject(dealOrderStr);
                                } catch (Exception e){
                                    System.out.println(dealOrderStr + "=========== bdeal:deal:" + dealId);
                                    System.exit(1);
                                }


                                if(null != dealOrderCacheData && null != tradeOrderCacheData){
                                    tradeOrderCacheData.put("Ftrade_ext_data",tradeExtData);
                                    dealOrderCacheData.put("Fdeal_ext_data",dealExtData);
                                    List<JSONObject> unionOrders = GdmFactOmsOrdersUtil.mergeResult(bdealOrderCacheData,dealOrderCacheData,tradeOrderCacheData);
                                    resArr.addAll(unionOrders);
                                }
                            }
                        }

                        return resArr;
                    } else if(T_DEAL.equals(tableName)){
                        String fdealId = data.getString("Fdeal_id");
                        String fbdealId = data.getString("Fbdeal_id");
                        //根据Fdeal_id获取Hbase中bdeal的数据('data',jsonObject)
                        Map<String,String> bdealCacheData = hBaseUtil.getRow(HBASE_TABLE_BDEAL,fbdealId);
                        JSONObject bdealOrderCacheData = null;
                        JSONObject dealOrderCacheData = null;
                        JSONObject tradeOrderCacheData = null;
                        try{
                            bdealOrderCacheData =  JSONObject.parseObject(bdealCacheData.get("orderData"));
                        } catch (Exception e){
                            System.out.println(bdealCacheData.get("orderData") + "=========== deal:bdeal:" + fbdealId);
                            System.exit(1);
                        }

                        /**
                         * 这里是可以省略一次查询Hbase的。之后做优化。
                         */
                        String dealOrderStr = hBaseUtil.getRow(HBASE_TABLE_DEAL,fdealId).get("orderData");
                        String dealExtData = hBaseUtil.getRow(HBASE_TABLE_DEAL,fdealId).getOrDefault("ext_data","{}");
                        try{
                            dealOrderCacheData =  JSONObject.parseObject(dealOrderStr);
                        } catch (Exception e){
                            System.out.println(dealOrderStr + "=========== deal:deal" + fdealId);
                            System.exit(1);
                        }
                        dealOrderCacheData.put("Fdeal_ext_data",dealExtData);
                        if(null != bdealOrderCacheData){
                            JSONArray tradeArray = JSONArray.parseArray(bdealCacheData.get("tradeArray"));
                            if(null != tradeArray){
                                //根据trade_id获取trade和deal表信息
                                for(int i = 0,size = tradeArray.size(); i < size; i++){
                                    String dealId = tradeArray.getString(i).split("_")[0];
                                    String tradeId = tradeArray.getString(i).split("_")[1];
                                    String tradeExtData = "{}";
                                    if(dealId.equals(fdealId)){
                                        String orderDataStr = hBaseUtil.getRow(HBASE_TABLE_TRADE,tradeId).get("orderData");
                                        tradeExtData = hBaseUtil.getRow(HBASE_TABLE_TRADE,tradeId).getOrDefault("ext_data","{}");

                                        try{
                                            tradeOrderCacheData =  JSONObject.parseObject(orderDataStr);
                                        } catch (Exception e){
                                            System.out.println(orderDataStr + "=========== deal:trade:" + tradeId);
                                            System.exit(1);
                                        }
                                    }

                                    if(null != tradeOrderCacheData){
                                        tradeOrderCacheData.put("Ftrade_ext_data",tradeExtData);
                                        List<JSONObject> unionOrders = GdmFactOmsOrdersUtil.mergeResult(bdealOrderCacheData,dealOrderCacheData,tradeOrderCacheData);
                                        resArr.addAll(unionOrders);
                                    }
                                }
                            }
                        }

                        return resArr;
                    } else if(T_TRADE.equals(tableName)){
                        String fdealId = data.getString("Fdeal_id");
                        String fbdealId = data.getString("Fbdeal_id");
                        String ftradeId = data.getString("Ftrade_id");

                        Map<String,String> bdealCacheData = hBaseUtil.getRow(HBASE_TABLE_BDEAL,fbdealId);
                        JSONObject bdealOrderCacheData = null ;
                        JSONObject dealOrderCacheData = null;
                        JSONObject tradeOrderCacheData = null;
                        try{
                            bdealOrderCacheData =  JSONObject.parseObject(bdealCacheData.get("orderData"));
                        } catch (Exception e){
                            System.out.println(bdealCacheData.get("orderData") + "=========== trade:bdeal:" + fbdealId);
                            System.exit(1);
                        }


                        String orderDataStr = hBaseUtil.getRow(HBASE_TABLE_DEAL,fdealId).get("orderData");
                        String dealExtData = hBaseUtil.getRow(HBASE_TABLE_DEAL,fdealId).getOrDefault("ext_data","{}");
                        try{
                            dealOrderCacheData = JSONObject.parseObject(orderDataStr);
                        } catch (Exception e){
                            System.out.println(orderDataStr + "=========== trade:deal:" + fdealId);
                            System.exit(1);
                        }


                        if(null != bdealOrderCacheData && null != dealOrderCacheData){
                            dealOrderCacheData.put("Fdeal_ext_data",dealExtData);
                            JSONArray tradeArray = JSONArray.parseArray(bdealCacheData.get("tradeArray"));
                            if(null == tradeArray){
                                tradeArray = new JSONArray();
                            }
                            tradeArray = mergeProp(tradeArray,fdealId + "_" + ftradeId);
                            bdealCacheData.put("tradeArray",tradeArray.toJSONString()); //tradeArray
                            hBaseUtil.insertOrderData(HBASE_TABLE_BDEAL, fbdealId, bdealCacheData);
                            //根据trade_id获取trade和deal表信息
                            String tradeOrderDataStr = hBaseUtil.getRow(HBASE_TABLE_TRADE,ftradeId).get("orderData");
                            String tradeExtData = hBaseUtil.getRow(HBASE_TABLE_TRADE,ftradeId).getOrDefault("ext_data","{}");

                            try{
                                tradeOrderCacheData = JSONObject.parseObject(tradeOrderDataStr);
                            } catch (Exception e){
                                System.out.println(tradeOrderDataStr + "=========== trade:trade:" + ftradeId);
                                System.exit(1);
                            }

                            tradeOrderCacheData.put("Ftrade_ext_data",tradeExtData);

                            List<JSONObject> unionOrders = GdmFactOmsOrdersUtil.mergeResult(bdealOrderCacheData,dealOrderCacheData,tradeOrderCacheData);
                            resArr.addAll(unionOrders);
                        }

                        return resArr;
                    }
                    return resArr;
                }
                return resArr;
            }

            @Override
            public void flatMap(Tuple2<String, JSONObject> value, Collector<Object> out) throws Exception {
                String tableName = value.f0;
                JSONObject data = value.f1;
                ArrayList<JSONObject> list = handle(data,tableName);
                if(list.size() > 0){
                    for(int i = 0,size = list.size(); i < size; i ++ ){
                        out.collect(list.get(i));
                    }
                }
            }
        });
        return singleStream;
    }
}

这个方案的好处是,任务挂掉以后,不用担心丢数据。重新消费即可

经过测试,此方案完全可行,但是这个方案有如下几点问题:

  • 3个表每条数据更新都会更新一次kudu,造成下游的Kudu压力有些大。
  • 在高并发的情况下,Hbase的压力不知道能否抗住,理论上应该没事,后期观察

虽然没有用到Flink的状态计算,但是,还是很想研究一下Flink的状态计算,想测试一个案例:
实现如下功能:

  • 3个kafka输入源,并且实现这3个数据源关联,模仿3个表join
  • 改变其中一个表的字段值,使用状态计算更新最终的结果
  • 增加大时间跨度大于1天,7天,30天。

Flink读取kafka多个topic遇到的问题:
如果要读取的Topic列表中,其中一个在Topic中没有数据,而你又基于Event Time提取Timestamp并且设置Watermark,
会导致整个Topic列表都没法基于时间窗口触发操作,解决方案:
先rebalance,然后再设置水位:

val monitorSampling = env
    .addSource(kafkaConsumer)
    .rebalance
    .assignTimestampsAndWatermarks(new MyWatermarkGenerator[MyRecord](Time.seconds(config.latencyDuration)))

Flink反压

反压(backpressure)是实时计算应用开发中,特别是流式计算中,十分常见的问题。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。

反压的影响

反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不太高或者数据量比较小的应用来说,反压的影响可能并不明显,然而对于规模比较大的 Flink 作业来说反压可能会导致严重的问题。

这是因为 Flink 的 checkpoint 机制,反压还会影响到两项指标: checkpoint 时长和 state 大小。

  • 前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to End Duration)变长。
  • 后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到state 里面,导致 checkpoint 变大。

这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。

定位反压节点

要解决反压首先要做的是定位到造成反压的节点,这主要有两种办法:

  • 通过 Flink Web UI 自带的反压监控面板;
  • 通过 Flink Task Metrics。
    前者比较容易上手,适合简单分析,后者则提供了更加丰富的信息,适合用于监控系统。因为反压会向上游传导,这两种方式都要求我们从 Source 节点到 Sink 的逐一排查,直到找到造成反压的根源原因。下面分别介绍这两种办法。

反压监控面板

Flink Web UI 的反压监控提供了 SubTask 级别的反压监控,原理是通过周期性对 Task 线程的栈信息采样,得到线程被阻塞在请求 Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。默认配置下,这个频率在 0.1 以下则为 OK,0.1 至 0.5 为 LOW,而超过 0.5 则为 HIGH。

如果处于反压状态,那么有两种可能性:

  • 该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的 Operator(比如 flatmap)。
  • 下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。
    如果是第一种状况,那么该节点则为反压的根源节点,它是从 Source Task 到 Sink Task 的第一个出现反压的节点。如果是第二种情况,则需要继续排查下游节点。

具体可以参考如何分析及处理 Flink 反压?
一文搞懂 Flink 网络流控与反压机制

FlinkCDC 采集mysql 写Hbase并从Hive内读取

java代码:

在hive里创建hbase外表,CDH集群原生支持Hive与Hbase打通,如果是自己搭建的集群,需要自己打通一下Hive与Hbase,这个网上很多,照着操作一下就好
在hive里创建外表,映射hbase表

CREATE EXTERNAL TABLE flume_mqtt(
rowkey string,
id int,
name string,
age int)
STORED BY 
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,f1:id,f1:name,f1:age") 
TBLPROPERTIES ("hbase.table.name" = "flume_mqtt");

遇到的问题

  1. hbase-default.xml file seems to be for an older version of HBase (2.2.3), this version is 2.1.0-cdh6.2.0
    问题原因:因为${FLINK_HOME}/lib下放了一个flink-sql-connector-hbase-2.2_2.12-1.13.6.jar,与项目里的hbase依赖的版本冲突了,删除${FLINK_HOME}/lib下的文件