本文主要包括:
- 实时订单模型实现
实时订单模型实现
公司最近要重构订单模型,需求是,把订单模型从离线完全转为实时,但是,需要解决如下几点问题:
- 底层的订单表分为3张表,最终的模型是把3个表关联成一张表
- 底层三个表的数据不是同时产生的,时间跨度有大有小,导致时间窗口不好控制
- 订单状态随时间变化,导致数据肯定会跨天,甚至有的订单状态变成完成状态需要30天以上
- 某个时间点,可能只会有一个表的数据更新,就会导致在处理这条数据的时候,肯定关联不上另外两个表
为了解决如上问题,有如下几个思路解决:
- 使用Flink的状态计算
- 把订单的3个表先缓存下来,每条数据来之后,先去缓存重获取另外两个表的数据
考虑到,订单的生命周期太长,一个订单从产生到结束,时间跨度大部分在2周之内,但是在大促期间,有的甚至能跨好几个月,这时候用状态计算来保留订单的状态就有些不恰当了。
所以最终考虑使用Hbase作为缓存组件,先将3个订单表缓存到Hbase中
数据源样例:
数据源分为t_bdeal(fbdeal_id),t_deal(fdeal_id),t_trade(ftrade_id)表,其中,t_deal中含有fbdeal_id,t_trade表中含有fbdeal_id和fdeal_id
数据关系如下图:
所以设置Hbase
3个表的rowkey
分别是每个表的主键,并在t_bdeal表中添加${fdeal_id}_${ftrade_id}的集合
数据样例如下:
bdeal缓存数据样例如下:
deal缓存数据样例如下:
trade缓存数据样例如下:
设计方案流程图如下:
代码实现
这里只贴关键部分的代码。
数据流都在transform
中实现,这里只贴transform
的代码:
package com.haiziwang.streaming.executor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import com.haiziwang.streaming.common.base.AbstractStreamExcutor;
import com.haiziwang.streaming.common.base.AbstractStreamSink;
import com.haiziwang.streaming.common.util.ResourcesUtil;
import com.haiziwang.streaming.constant.UserTagConstant;
import com.haiziwang.streaming.sink.OmsOrderBdealSink;
import com.haiziwang.streaming.util.DateUtil;
import com.haiziwang.streaming.util.GdmFactOmsOrdersUtil;
import com.haiziwang.streaming.util.HBaseUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* Author: gujc
* Date: 2021/03/03 15:38
* Description:
* rowkey设计:
* bdeal: Fbdeal_id
* deal: Fdeal_id
* trade:Ftrade_id
* 1、Hbase的bdeal表里存deal_id和trade_id的集合(deal_id_set,trade_id_set)
* 2、之后trade表和bdeal表数据更新,如果是新增,那么也会更新bdeal表数据
* 3、可能需要在窗口里对数据根据update_time排序,取最新的数据
* 4、需要考虑一下,如果两个流数据同时过来,应该先更新哪个的问题
* 5、如果数据更新方式是delete,需要过滤掉吗?
*
*
* 重要未做:
* 订单表的预分区,你可以找杜鹏这边了解一下采集系统的做法
* 需要考虑在查询hbase的同时,另外一个并发在更新同一条记录,这里暂时无解,可能需要用到锁
* 3个表的每条记录都会更新一次kudu,
*/
public class GdmFactOmsOrdersExecutor extends AbstractStreamExcutor implements Serializable {
/**
* OMS订单表
*/
private static final String T_DEAL = "t_deal";
private static final String T_BDEAL = "t_bdeal";
private static final String T_TRADE = "t_trade";
private static final String BDEAL_MATCHES = "t_bdeal_[0-9]+$";
private static final String DEAL_MATCHES = "t_deal_[0-9]+$";
private static final String TRADE_MATCHES = "t_trade_[0-9]+$";
private static final String HBASE_TABLE_TRADE = "hb_app_onedata_oms_orders_t_trade_da";
private static final String HBASE_TABLE_DEAL = "hb_app_onedata_oms_orders_t_deal_da";
private static final String HBASE_TABLE_BDEAL = "hb_app_onedata_oms_orders_t_bdeal_da";
@Override
public AbstractStreamSink getFlinkSink() {
return new OmsOrderBdealSink();
}
@Override
public DataStream<Object> transform(DataStream<String> dataStream) {
SingleOutputStreamOperator<Object> singleStream = dataStream.map(new MapFunction<String, Tuple3<String, String,String>>() {
@Override
public Tuple3<String, String,String> map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
String data = jsonObject.getString("data");
String table = jsonObject.getString("table");
String type = jsonObject.getString("type");
return new Tuple3<String, String,String>(table, data,type);
}
}).filter(new FilterFunction<Tuple3<String, String,String>>() {
@Override
public boolean filter(Tuple3<String, String,String> value) throws Exception {
String optType = value.f2.toLowerCase();
if(null != value.f1 && !value.f1.isEmpty() && !"delete".equals(optType)){
return true;
}
return false;
}
}).flatMap(new FlatMapFunction<Tuple3<String, String,String>,Tuple2<String, JSONObject>>() {
@Override
public void flatMap(Tuple3<String, String,String> value, Collector<Tuple2<String, JSONObject>> out) throws Exception {
List<JSONObject> jsonObjects = JSON.parseArray(value.f1, JSONObject.class);
String tableName = "";
if(Pattern.matches(BDEAL_MATCHES, value.f0)){
tableName = UserTagConstant.T_BDEAL;
} else if(Pattern.matches(DEAL_MATCHES, value.f0)){
tableName = UserTagConstant.T_DEAL;
} else if(Pattern.matches(TRADE_MATCHES, value.f0)){
tableName = UserTagConstant.T_TRADE;
}
if(!StringUtils.isNullOrWhitespaceOnly(tableName)){
for (JSONObject jsonObject : jsonObjects) {
out.collect(new Tuple2<>(tableName.toLowerCase(),jsonObject));
}
}
}
})
.flatMap(new RichFlatMapFunction<Tuple2<String, JSONObject>, Object>() {
private HBaseUtil hBaseUtil;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
hBaseUtil = HBaseUtil.getInstance(ResourcesUtil.getValue("conf", "zookeeper"));
}
private JSONArray mergeProp(JSONArray jsonArray, String prop) {
if (jsonArray == null) {
jsonArray = new JSONArray();
}
if (!jsonArray.contains(prop)) {
jsonArray.add(prop);
}
return jsonArray;
}
/**
* 更新Hbase表数据
* 如果数据已经存在,判断update时间是否在历史数据之后,是则更新
* 数据需要清洗后才能入Hbase
* Hbase数据结构:
* rowkey -> Map<orderData,updateTime>
* 如果是trade数据,那么还需要再更新一下bdeal数据,为了存储tradeArray
* @param data
* @return updateRes,更新true,未更新false
*/
private Boolean updateHbase(JSONObject data,String tableName) throws IOException, ParseException {
String updateTime = "";
String cacheTableName = "";
String rowkey = "";
String updateTimeCache = "";
JSONArray tradeArray = null;
Map<String, String> cacheMap = Maps.newHashMap();
if(T_DEAL.equals(tableName)){
updateTime = data.getString("Flast_update_time");
cacheTableName = HBASE_TABLE_DEAL;
rowkey = data.getString("Fdeal_id");
cacheMap.put("ext_data",data.getString("Fdeal_ext_data"));
} else if(T_BDEAL.equals(tableName)){
updateTime = data.getString("Fbdeal_update_time");
cacheTableName = HBASE_TABLE_BDEAL;
rowkey = data.getString("Fbdeal_id");
tradeArray = JSONArray.parseArray(data.getString("tradeArray"));
} else if(T_TRADE.equals(tableName)){
updateTime = data.getString("Ftrade_update_time");
cacheTableName = HBASE_TABLE_TRADE;
rowkey = data.getString("Ftrade_id");
//更新fbdeal缓存的tradeArray
String bdealId = data.getString("Fbdeal_id");
String dealId = data.getString("Fdeal_id");
cacheMap.put("ext_data",data.getString("Ftrade_ext_data"));
Map<String, String> bdealDataMap = hBaseUtil.getRow(HBASE_TABLE_BDEAL,bdealId);
JSONArray tradeArrayTemp = null == bdealDataMap.get("tradeArray") ?
new JSONArray() : JSONArray.parseArray(bdealDataMap.get("tradeArray"));
tradeArrayTemp = mergeProp(tradeArrayTemp,dealId + "_" + rowkey);
if(bdealDataMap.size() == 0){
//如果bdeal还没有数据,不加updateTime,为了让bdeal数据来了以后,能够更新数据
bdealDataMap.put("Fbdeal_id",bdealId);
bdealDataMap.put("tradeArray",tradeArrayTemp.toJSONString());
} else {
//如果bdeal已经有数据
bdealDataMap.put("tradeArray",tradeArrayTemp.toJSONString());
}
hBaseUtil.insertOrderData(HBASE_TABLE_BDEAL, bdealId,bdealDataMap);
} else {
return false;
}
updateTimeCache = hBaseUtil.getRow(cacheTableName,rowkey).get("updateTime");
//如果更新时间大于缓存的时间,则更新数据,否则不更新
if(StringUtils.isNullOrWhitespaceOnly(updateTimeCache) || DateUtil.dateDiffMilliSecond(updateTimeCache,updateTime) > 0){
if(null != tradeArray){
data.put("tradeArray",tradeArray.toJSONString());
}
cacheMap.put("orderData",data.toJSONString());
cacheMap.put("updateTime",updateTime);
hBaseUtil.insertOrderData(cacheTableName, rowkey, cacheMap);
return true;
} else if (DateUtil.dateDiffMilliSecond(updateTimeCache,updateTime) == 0){
//如果这个数据已经在hbase里存在了,那么不更新hbase,但是需要处理后面的逻辑
return true;
} else {
return false;
}
}
/**
* 主逻辑:
* updateHbase返回值,更新了数据则继续,否则结束
* 按照3个订单表的不同,处理逻辑不同:
*
* 1. 交易单执行逻辑(bdeal)
* 1.1 根据tradeArray获取Hbase中trade的数据
* 1.2 从trade表中获取Fdeal_id,以此获取Hbase中deal的数据
* 1.3 拼接结果数据,并返回
*
* 2. 商品单数据更新处理逻辑(trade)
* 2.1. 根据Fdeal_id获取Hbase中deal的数据
* 2.2. 根据Fbdeal_id获取Hbase中bdeal中的数据,并拼接tradeArray,写入bdeal表
* 2.3. 拼接结果数据,并返回
*
* 3. 包裹单数据更新处理逻辑(deal)
* 3.1 根据Fbdeal_id获取Hbase中bdeal的数据,
* 3.2 并根据bdeal表的tradeArray获取trade表数据
* 3.3 拼接结果数据,并返回
*
* 需要解决一个问题:
* 如果trade数据先进来,只会更新trade缓存。那么等到bdeal数据来之后,因为没有tradeArray,数据就没法处理了
* 所以,在更新trade缓存的时候,也需要更新bdeal的缓存。
* @param data
*/
private ArrayList<JSONObject> handle(JSONObject data,String tableName) throws IOException, ParseException {
ArrayList<JSONObject> resArr = new ArrayList();
boolean flag = updateHbase(data,tableName);
if(flag){
if(T_BDEAL.equals(tableName)){
String fbdealId = data.getString("Fbdeal_id");
//根据Fbdeal_id获取Hbase中bdeal的数据('data',jsonObject)
Map<String,String> bdealCacheData = hBaseUtil.getRow(HBASE_TABLE_BDEAL,fbdealId);
JSONObject bdealOrderCacheData = null;
JSONObject dealOrderCacheData = null;
JSONObject tradeOrderCacheData = null;
try{
bdealOrderCacheData = JSONObject.parseObject(bdealCacheData.get("orderData"));
} catch (Exception e){
System.out.println(bdealCacheData.get("orderData") + "=========== bdeal:bdeal:" + fbdealId);
System.exit(1);
}
JSONArray tradeArray = JSONArray.parseArray(bdealCacheData.get("tradeArray"));
//根据trade_id获取trade和deal表信息
if(null != tradeArray){
for(int i = 0,size = tradeArray.size(); i < size; i++){
String dealId = tradeArray.getString(i).split("_")[0];
String tradeId = tradeArray.getString(i).split("_")[1];
String orderDataStr = hBaseUtil.getRow(HBASE_TABLE_TRADE,tradeId).get("orderData");
String tradeExtData = hBaseUtil.getRow(HBASE_TABLE_TRADE,tradeId).getOrDefault("ext_data","{}");
try{
tradeOrderCacheData = JSONObject.parseObject(orderDataStr);
} catch (Exception e){
System.out.println(orderDataStr + "=========== bdeal:trade:" + tradeId);
System.exit(1);
}
String dealOrderStr = hBaseUtil.getRow(HBASE_TABLE_DEAL,dealId).get("orderData");
String dealExtData = hBaseUtil.getRow(HBASE_TABLE_DEAL,dealId).getOrDefault("ext_data","{}");
try{
dealOrderCacheData = JSONObject.parseObject(dealOrderStr);
} catch (Exception e){
System.out.println(dealOrderStr + "=========== bdeal:deal:" + dealId);
System.exit(1);
}
if(null != dealOrderCacheData && null != tradeOrderCacheData){
tradeOrderCacheData.put("Ftrade_ext_data",tradeExtData);
dealOrderCacheData.put("Fdeal_ext_data",dealExtData);
List<JSONObject> unionOrders = GdmFactOmsOrdersUtil.mergeResult(bdealOrderCacheData,dealOrderCacheData,tradeOrderCacheData);
resArr.addAll(unionOrders);
}
}
}
return resArr;
} else if(T_DEAL.equals(tableName)){
String fdealId = data.getString("Fdeal_id");
String fbdealId = data.getString("Fbdeal_id");
//根据Fdeal_id获取Hbase中bdeal的数据('data',jsonObject)
Map<String,String> bdealCacheData = hBaseUtil.getRow(HBASE_TABLE_BDEAL,fbdealId);
JSONObject bdealOrderCacheData = null;
JSONObject dealOrderCacheData = null;
JSONObject tradeOrderCacheData = null;
try{
bdealOrderCacheData = JSONObject.parseObject(bdealCacheData.get("orderData"));
} catch (Exception e){
System.out.println(bdealCacheData.get("orderData") + "=========== deal:bdeal:" + fbdealId);
System.exit(1);
}
/**
* 这里是可以省略一次查询Hbase的。之后做优化。
*/
String dealOrderStr = hBaseUtil.getRow(HBASE_TABLE_DEAL,fdealId).get("orderData");
String dealExtData = hBaseUtil.getRow(HBASE_TABLE_DEAL,fdealId).getOrDefault("ext_data","{}");
try{
dealOrderCacheData = JSONObject.parseObject(dealOrderStr);
} catch (Exception e){
System.out.println(dealOrderStr + "=========== deal:deal" + fdealId);
System.exit(1);
}
dealOrderCacheData.put("Fdeal_ext_data",dealExtData);
if(null != bdealOrderCacheData){
JSONArray tradeArray = JSONArray.parseArray(bdealCacheData.get("tradeArray"));
if(null != tradeArray){
//根据trade_id获取trade和deal表信息
for(int i = 0,size = tradeArray.size(); i < size; i++){
String dealId = tradeArray.getString(i).split("_")[0];
String tradeId = tradeArray.getString(i).split("_")[1];
String tradeExtData = "{}";
if(dealId.equals(fdealId)){
String orderDataStr = hBaseUtil.getRow(HBASE_TABLE_TRADE,tradeId).get("orderData");
tradeExtData = hBaseUtil.getRow(HBASE_TABLE_TRADE,tradeId).getOrDefault("ext_data","{}");
try{
tradeOrderCacheData = JSONObject.parseObject(orderDataStr);
} catch (Exception e){
System.out.println(orderDataStr + "=========== deal:trade:" + tradeId);
System.exit(1);
}
}
if(null != tradeOrderCacheData){
tradeOrderCacheData.put("Ftrade_ext_data",tradeExtData);
List<JSONObject> unionOrders = GdmFactOmsOrdersUtil.mergeResult(bdealOrderCacheData,dealOrderCacheData,tradeOrderCacheData);
resArr.addAll(unionOrders);
}
}
}
}
return resArr;
} else if(T_TRADE.equals(tableName)){
String fdealId = data.getString("Fdeal_id");
String fbdealId = data.getString("Fbdeal_id");
String ftradeId = data.getString("Ftrade_id");
Map<String,String> bdealCacheData = hBaseUtil.getRow(HBASE_TABLE_BDEAL,fbdealId);
JSONObject bdealOrderCacheData = null ;
JSONObject dealOrderCacheData = null;
JSONObject tradeOrderCacheData = null;
try{
bdealOrderCacheData = JSONObject.parseObject(bdealCacheData.get("orderData"));
} catch (Exception e){
System.out.println(bdealCacheData.get("orderData") + "=========== trade:bdeal:" + fbdealId);
System.exit(1);
}
String orderDataStr = hBaseUtil.getRow(HBASE_TABLE_DEAL,fdealId).get("orderData");
String dealExtData = hBaseUtil.getRow(HBASE_TABLE_DEAL,fdealId).getOrDefault("ext_data","{}");
try{
dealOrderCacheData = JSONObject.parseObject(orderDataStr);
} catch (Exception e){
System.out.println(orderDataStr + "=========== trade:deal:" + fdealId);
System.exit(1);
}
if(null != bdealOrderCacheData && null != dealOrderCacheData){
dealOrderCacheData.put("Fdeal_ext_data",dealExtData);
JSONArray tradeArray = JSONArray.parseArray(bdealCacheData.get("tradeArray"));
if(null == tradeArray){
tradeArray = new JSONArray();
}
tradeArray = mergeProp(tradeArray,fdealId + "_" + ftradeId);
bdealCacheData.put("tradeArray",tradeArray.toJSONString()); //tradeArray
hBaseUtil.insertOrderData(HBASE_TABLE_BDEAL, fbdealId, bdealCacheData);
//根据trade_id获取trade和deal表信息
String tradeOrderDataStr = hBaseUtil.getRow(HBASE_TABLE_TRADE,ftradeId).get("orderData");
String tradeExtData = hBaseUtil.getRow(HBASE_TABLE_TRADE,ftradeId).getOrDefault("ext_data","{}");
try{
tradeOrderCacheData = JSONObject.parseObject(tradeOrderDataStr);
} catch (Exception e){
System.out.println(tradeOrderDataStr + "=========== trade:trade:" + ftradeId);
System.exit(1);
}
tradeOrderCacheData.put("Ftrade_ext_data",tradeExtData);
List<JSONObject> unionOrders = GdmFactOmsOrdersUtil.mergeResult(bdealOrderCacheData,dealOrderCacheData,tradeOrderCacheData);
resArr.addAll(unionOrders);
}
return resArr;
}
return resArr;
}
return resArr;
}
@Override
public void flatMap(Tuple2<String, JSONObject> value, Collector<Object> out) throws Exception {
String tableName = value.f0;
JSONObject data = value.f1;
ArrayList<JSONObject> list = handle(data,tableName);
if(list.size() > 0){
for(int i = 0,size = list.size(); i < size; i ++ ){
out.collect(list.get(i));
}
}
}
});
return singleStream;
}
}
这个方案的好处是,任务挂掉以后,不用担心丢数据。重新消费即可
经过测试,此方案完全可行,但是这个方案有如下几点问题:
- 3个表每条数据更新都会更新一次kudu,造成下游的Kudu压力有些大。
- 在高并发的情况下,Hbase的压力不知道能否抗住,理论上应该没事,后期观察
虽然没有用到Flink的状态计算,但是,还是很想研究一下Flink的状态计算,想测试一个案例:
实现如下功能:
- 3个kafka输入源,并且实现这3个数据源关联,模仿3个表join
- 改变其中一个表的字段值,使用状态计算更新最终的结果
- 增加大时间跨度大于1天,7天,30天。
Flink读取kafka多个topic遇到的问题:
如果要读取的Topic列表中,其中一个在Topic中没有数据,而你又基于Event Time提取Timestamp并且设置Watermark,
会导致整个Topic列表都没法基于时间窗口触发操作,解决方案:
先rebalance,然后再设置水位:
val monitorSampling = env
.addSource(kafkaConsumer)
.rebalance
.assignTimestampsAndWatermarks(new MyWatermarkGenerator[MyRecord](Time.seconds(config.latencyDuration)))
Flink反压
反压(backpressure)是实时计算应用开发中,特别是流式计算中,十分常见的问题。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。
反压的影响
反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不太高或者数据量比较小的应用来说,反压的影响可能并不明显,然而对于规模比较大的 Flink 作业来说反压可能会导致严重的问题。
这是因为 Flink 的 checkpoint 机制,反压还会影响到两项指标: checkpoint 时长和 state 大小。
- 前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to End Duration)变长。
- 后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到state 里面,导致 checkpoint 变大。
这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。
定位反压节点
要解决反压首先要做的是定位到造成反压的节点,这主要有两种办法:
- 通过 Flink Web UI 自带的反压监控面板;
- 通过 Flink Task Metrics。
前者比较容易上手,适合简单分析,后者则提供了更加丰富的信息,适合用于监控系统。因为反压会向上游传导,这两种方式都要求我们从 Source 节点到 Sink 的逐一排查,直到找到造成反压的根源原因。下面分别介绍这两种办法。
反压监控面板
Flink Web UI 的反压监控提供了 SubTask 级别的反压监控,原理是通过周期性对 Task 线程的栈信息采样,得到线程被阻塞在请求 Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。默认配置下,这个频率在 0.1 以下则为 OK,0.1 至 0.5 为 LOW,而超过 0.5 则为 HIGH。
如果处于反压状态,那么有两种可能性:
- 该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的 Operator(比如 flatmap)。
- 下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。
如果是第一种状况,那么该节点则为反压的根源节点,它是从 Source Task 到 Sink Task 的第一个出现反压的节点。如果是第二种情况,则需要继续排查下游节点。
具体可以参考如何分析及处理 Flink 反压?
一文搞懂 Flink 网络流控与反压机制
FlinkCDC 采集mysql 写Hbase并从Hive内读取
java代码:
在hive里创建hbase外表,CDH集群原生支持Hive与Hbase打通,如果是自己搭建的集群,需要自己打通一下Hive与Hbase,这个网上很多,照着操作一下就好
在hive里创建外表,映射hbase表
CREATE EXTERNAL TABLE flume_mqtt(
rowkey string,
id int,
name string,
age int)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,f1:id,f1:name,f1:age")
TBLPROPERTIES ("hbase.table.name" = "flume_mqtt");
遇到的问题
hbase-default.xml file seems to be for an older version of HBase (2.2.3), this version is 2.1.0-cdh6.2.0
问题原因:因为${FLINK_HOME}/lib下放了一个flink-sql-connector-hbase-2.2_2.12-1.13.6.jar
,与项目里的hbase依赖的版本冲突了,删除${FLINK_HOME}/lib下的文件