0%

starrocks学习笔记

本文主要包括:

  • starrocks学习笔记

starrocks简介

starrocks的官网写的很好,也有中文文档,文档可以直接看starrocks官网

实战经验

可以参考StarRocks 实战指南:100+ 大型企业背后的最佳实践经验

导数据到Starrocks

从本地文件导数据到Starrocks

注意,文件格式需要是utf-8,如果有中文乱码,会报错:
原始文件为:

"ECU01","ECU02","ECU03","ECU04","ECU05","ECU06","ECU07","ECU08","ECU09","ECUACTI","ECUUSER","ECUGRUP","ECUMODU","ECUDATE","ECU10","ECU11","ECUUD01","ECUUD02","ECUUD03","ECUUD04","ECUUD05","ECUUD06","ECUUD07","ECUUD08","ECUUD09","ECUUD10","ECUUD11","ECUUD12","ECUUD13","ECUUD14","ECUUD15","ECUORIU","ECUORIG","ECU012","ECU014","ECU015"
"1101830000001","A01","10激光切割,20刻字外协,30油压校平,40加工中心",10,40,"","","","","Y","00696","3000","00696",2014-10-18 00:00:00,"Y",,"","","","","","",,,,,,,,,,"00696","3000"," ","",""
"1101830000002","A01","10激光切割,20刻字外协,30油压校平",10,30,"","","","","Y","00696","3000","00696",2014-10-18 00:00:00,"Y",,"","","","","","",,,,,,,,,,"00696","3000"," ","",""
"1101830000004","A01","10激光切割,20刻字外协,30油压校平",10,30,"","","","","Y","00696","3000","00696",2014-10-18 00:00:00,"Y",,"","","","","","",,,,,,,,,,"00696","3000"," ","",""
"1101830000005","A01","10激光切割,20刻字外协,30油压校平",10,30,"","","","","Y","00696","3000","00696",2014-10-18 00:00:00,"Y",,"","","","","","",,,,,,,,,,"00696","3000"," ","",""

导入语句:

iconv -f gbk -t UTF-8 ecu_file.csv -o ecu_file1.csv
sed -i 's/\r//g' ecu_file1.csv
curl --location-trusted -u root:DiGiWin@123 -H "label:ECU_FILE" \
-H "Expect:100-continue" -H "timeout: 259200" -H "max_filter_ratio: 0.2" \
-H "column_separator: ," -H "skip_header: 1" -H "enclose: \"" \
-H "columns: ECU01,ECU02,ECU03,ECU04,ECU05,ECU06,ECU07,ECU08,ECU09,ECUACTI,ECUUSER,ECUGRUP,ECUMODU,ECUDATE,ECU10,ECU11,ECUUD01,ECUUD02,ECUUD03,ECUUD04,ECUUD05,ECUUD06,ECUUD07,ECUUD08,ECUUD09,ECUUD10,ECUUD11,ECUUD12,ECUUD13,ECUUD14,ECUUD15,ECUORIU,ECUORIG,ECU012,ECU014,ECU015" \
-T ecu_file1.csv -XPUT http://172.16.101.227:18030/api/SMESPROD_GJC/ECU_FILE/_stream_load

实际测试,1.4G的数据,在单机情况下,导入时间为:"ReadDataTimeMs": 14933ms,"WriteDataTimeMs": 16662ms,

flinkcdc实时写Starrocks

下载flink-connector-starrocks,可以直接在maven仓库里下载相应的jar包,并把它放在${FLINK_HOME}/lib下,启动flink集群以及sql-client
在starrocks里创建表:

 create table gjc_test_binlog(
    uuid int(10),
    name varchar(50),
    age int(2),
    ts DATETIME,
    part varchar(10)
) 
 PRIMARY KEY (uuid)
 DISTRIBUTED BY HASH(uuid) BUCKETS 1
 PROPERTIES("replication_num" = "1")
;

注意:
这里需要指定replication_num,系统默认是3,但是当前集群节点只有1个be,否则插入数据会报如下错误:

Failed to find enough host in all backends. need: 3

在flink的sql客户端里创建映射表:

set execution.checkpointing.interval=3sec;
CREATE TABLE users_source_mysql (
    uuid int primary key,
    name STRING,
    age int,
    ts timestamp(3),
    part string
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '150.158.190.192',
'port' = '3306',
'username' = 'root',
'password' = 'Gjc123!@#',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode' = 'initial',
'database-name' = 'test',
'table-name' = 'users_source_mysql'
);

CREATE TABLE star_gjc_test_binlog (
  uuid int primary key ,
  name STRING,
  age int,
  ts timestamp(3),
  part string 
) WITH (
   'connector'='starrocks',
   'jdbc-url'='jdbc:mysql://172.16.2.205:19030',
   'load-url'='172.16.2.205:18030',
   'username'='root',
   'password'='',
   'database-name'='test',
   'table-name'='gjc_test_binlog'
);
insert into star_gjc_test_binlog select uuid,name,age,ts,part from users_source_mysql;

和clickhouse一样,在导数据之前,需要在starrocks里提前把表创建好,否则会报错

注意:
建表的时候一定要注意字段长度,如果字段长度不够,数据会默认置为空

使用DataStream写Starrocks

flinkcdc采集多表并sink到starrrocks

这里在实现多表sink的时候,一开始想要偷懒使用jdbc的方式,自动拼接sql写入starrocks的,但是,会报too many versions
原因是,starrcoks每次insert into都会生成一个version,然后会定期compaction,在采集的snapshot阶段,由于写入的数据比较大,会导致version过多
需要注意的是,使用jdbc的pstmt.executeBatch();也是不行的,数据还是一条一条的insert。
starrocks的微批写入,不是使用pstmt.executeBatch();,而是用的stream load方式,下面直接开发stream load方式
多个表同步需要利用flink-connector的DataStream api写入starrocks,cdc多表写入到starrocks的思路是source按cdc配置读取多表,mapfunc转化成StarRocksSinkRowDataWithMeta,然后写入到starrocks sink
直接上代码:

public static void runJob(CommandArgs commandArgs) throws Exception {
        String checkpointPath = baseCheckpintPath + DigestUtils.md5Hex(commandArgs.getSchema().trim() + "." + commandArgs.getStbn().trim()).toUpperCase();
        Configuration conf = new Configuration();
        //设置自动获取上次的断点
        if("true".equals(commandArgs.getIsFromCK())){
            String lastCheckpintPath = CheckpointUtil.getCkPath(checkpointPath);
            LOG.info("<=======  断点续传:自动从 {} 恢复 =======>",lastCheckpintPath);
            //断点续传,获取最近一次的checkpoint路径,如果路径为空,那么告警并退出
            //todo 这里需要检查一下当前flink job的状态是否已完成,如果是running,直接退出
            conf.setString("execution.savepoint.path", lastCheckpintPath);
        } else {
            //不断点续传,删除目标表并重新创建目标表
            LOG.info("<=======  非断点续传:删除目标表并重建 =======>");
            String jdbcUrl = String.format("jdbc:mysql://%s:%s/%s",commandArgs.getThost(),commandArgs.getTport(),commandArgs.getTdb());
            StarRocksUtil starRocksUtil = StarRocksUtil.getInstance(new StarRocksJdbcConnectionOptions(jdbcUrl,commandArgs.getTuser(),commandArgs.getTpsw()));
            SqlConstructorUtils sqlConstructorUtils = new SqlConstructorUtils(commandArgs);
            ArrayList<String> sqlCreateList = sqlConstructorUtils.getStarrocksCreateSql();
            ArrayList<String> sqlDropList = sqlConstructorUtils.getStarrocksDropSql();
            for (int i = 0; i < sqlCreateList.size(); i++) {
                LOG.info("<=======  非断点续传:删除目标表 {} =======>",sqlDropList.get(i));
                starRocksUtil.excuteSql(sqlDropList.get(i),null);
                LOG.info("<=======  非断点续传:创建目标表 {} =======>",sqlCreateList.get(i));
                starRocksUtil.excuteSql(sqlCreateList.get(i),null);
            }

        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        //配置ck的状态后端
        env.setStateBackend(new HashMapStateBackend());
        //设置系统异常退出或人为 Cancel 掉,不删除checkpoint数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //设置checkpoint存储目录
        env.getCheckpointConfig().setCheckpointStorage(checkpointPath);
        env.enableCheckpointing(checkpointInterval);
        //设置num值来调整任务允许Checkpoint失败的次数。num需要为0或正整数。如果num为0时,则表示不允许存在任何Checkpoint异常或者失败。
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1000000000);
        //每隔2秒重试一次,一共重启2147483647次
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, // 每个时间间隔的最大故障次数
                Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
                Time.of(10, TimeUnit.SECONDS) // 延时
        ));
        String dbName = commandArgs.getTdb();
        String schama = commandArgs.getSchema();
        String tableName = (schama  + "." + commandArgs.getStbn()).replaceAll(",","," + schama  + ".");
        String[] tableArr =tableName.split(",");
        Properties dbzProperty = new Properties();
        dbzProperty.setProperty("log.mining.strategy", "online_catalog");
        dbzProperty.setProperty("log.mining.continuous.mine","true");
        dbzProperty.setProperty("poll.interval.ms","5");
        dbzProperty.setProperty("database.tablename.case.insensitive","false");
        dbzProperty.setProperty("log.mining.batch.size.min","50");
        dbzProperty.setProperty("log.mining.batch.size.default","100");
        dbzProperty.setProperty("log.mining.batch.size.max","10000");
        dbzProperty.setProperty("log.mining.sleep.time.default.ms","10");
        dbzProperty.setProperty("log.mining.sleep.time.max.ms","200");
        dbzProperty.setProperty("log.mining.sleep.time.increment.ms","50");
        dbzProperty.setProperty("log.mining.view.fetch.size","10000");
        dbzProperty.setProperty("lob.enabled","true");
        dbzProperty.setProperty("snapshot.fetch.size","8000");
        dbzProperty.setProperty("snapshot.delay.ms","3000");
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
                .hostname(commandArgs.getShost())
                .port(Integer.parseInt(commandArgs.getSport()))
                .database(commandArgs.getSdb())
                .schemaList(commandArgs.getSchema())
                .tableList(tableArr)
                .username(commandArgs.getSuser())
                .password(commandArgs.getSpsw())
                .debeziumProperties(dbzProperty)
                .deserializer(new StarRocksDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", String.format("jdbc:mysql://%s:%s",commandArgs.getThost(),commandArgs.getTport()))
                .withProperty("load-url", String.format("%s:%s",commandArgs.getThost(),commandArgs.getLoadPort()))
                .withProperty("username", commandArgs.getTuser())
                .withProperty("password", commandArgs.getTpsw())
                .withProperty("table-name", "")
                .withProperty("database-name", "")
                .withProperty("sink.properties.format", "json")
                .withProperty("sink.properties.strip_outer_array", "true")
                .withProperty("sink.buffer-flush.interval-ms","1000")
                .withProperty("sink.properties.column_separator", "\\x01")
                .withProperty("sink.properties.row_delimiter", "\\x02")
                .build();
        SinkFunction sinkFunction = StarRocksStreamLoadSink.sink(sinkOptions,commandArgs.getStbn());

        env.addSource(sourceFunction)
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String row) throws Exception {
                        String operation = JSONObject.parseObject(row).getString("op");
                        return Envelope.Operation.CREATE.code().equals(operation)
                                || Envelope.Operation.READ.code().equals(operation)
                                || Envelope.Operation.DELETE.code().equals(operation)
                                || Envelope.Operation.UPDATE.code().equals(operation);
                    }
                })
                .flatMap(new RichFlatMapFunction<String, StarRocksSinkRowDataWithMeta>() {
                    private SimpleDateFormat sdf;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    }

                    @Override
                    public void flatMap(String row, Collector<StarRocksSinkRowDataWithMeta> collector) throws Exception {
                        JSONObject rowJson = JSONObject.parseObject(row);
                        String operation = rowJson.getString("op");
                        JSONObject afterJson = JSONObject.parseObject(rowJson.getString("after"));
                        afterJson.put("update_time",sdf.format(new Date()));
                        JSONObject beforeJson = JSONObject.parseObject(rowJson.getString("before"));
                        beforeJson.put("update_time",sdf.format(new Date()));
                        if(Envelope.Operation.CREATE.code().equals(operation) || Envelope.Operation.READ.code().equals(operation)){
                            StarRocksSinkRowDataWithMeta rowData = new StarRocksSinkRowDataWithMeta();
                            rowData.setDatabase(dbName);
                            rowData.setTable(rowJson.getString("tableName"));
                            afterJson.put(StarRocksSinkOP.COLUMN_KEY,StarRocksSinkOP.UPSERT.ordinal());
                            rowData.addDataRow(afterJson.toJSONString());
                            collector.collect(rowData);
                        } else if (Envelope.Operation.DELETE.code().equals(operation)){
                            beforeJson.put(StarRocksSinkOP.COLUMN_KEY,StarRocksSinkOP.DELETE.ordinal());
                            StarRocksSinkRowDataWithMeta rowData = new StarRocksSinkRowDataWithMeta();
                            rowData.setDatabase(dbName);
                            rowData.setTable(rowJson.getString("tableName"));
                            rowData.addDataRow(beforeJson.toJSONString());
                            collector.collect(rowData);
                        } else if (Envelope.Operation.UPDATE.code().equals(operation)){
                            StarRocksSinkRowDataWithMeta beforeRowData = new StarRocksSinkRowDataWithMeta();
                            StarRocksSinkRowDataWithMeta afterRowData = new StarRocksSinkRowDataWithMeta();
                            beforeRowData.setDatabase(dbName);
                            beforeRowData.setTable(rowJson.getString("tableName"));
                            afterRowData.setDatabase(dbName);
                            afterRowData.setTable(rowJson.getString("tableName"));
                            beforeJson.put(StarRocksSinkOP.COLUMN_KEY,StarRocksSinkOP.DELETE.ordinal());
                            beforeRowData.addDataRow(beforeJson.toJSONString());
                            afterJson.put(StarRocksSinkOP.COLUMN_KEY,StarRocksSinkOP.UPSERT.ordinal());
                            afterRowData.addDataRow(afterJson.toJSONString());
                            collector.collect(beforeRowData);
                            collector.collect(afterRowData);
                        }
                    }
                })
                .addSink(sinkFunction)
                ;
        env.execute("More Table Collector Snapshot And Binlog");
    }

因为解析的cdc数据源转化为json了,所以必须要加上如下参数设置:

.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")

常用运维命令

-- 查看占用存储
-- 查看某个数据库占用存储
show data;
-- 查看某个表占用存储
show data from test.tableName;

-- 查看当前集群的系统变量信息。如不指定修饰符,默认值为 SESSION。
SHOW [ GLOBAL | SESSION ] VARIABLES LIKE 'wait_timeout';
-- 为 StarRocks 设置指定的系统变量或用户自定义变量
SET [ GLOBAL | SESSION ] <variable_name> = <value> [, <variable_name> = <value>] ...
SET time_zone = "Asia/Shanghai";
--同时设置多个全局变量。需要在所有变量名前添加 GLOBAL 关键字
SET
GLOBAL exec_mem_limit = 2147483648,
       GLOBAL time_zone = "Asia/Shanghai";
--该语句用于展示当前集群的配置(当前仅支持展示 FE 的配置项)。
ADMIN SHOW FRONTEND CONFIG [LIKE "pattern"]
ADMIN SHOW FRONTEND CONFIG LIKE '%check_java_version%';
-- 设置集群的配置项(当前仅支持设置 FE 的配置项)。
ADMIN SET FRONTEND CONFIG ("key" = "value")

具体可以参考All commands

遇到的问题

  1. 实时写starrocks不要使用UNIQUE模型,因为UNIQUE模型不能识别delete/update操作
  2. 主键模型的主键长度最大是127,这里在使用的时候,把starrcoks的源码改了一下,从128改成512了

Starrocks Streamload java实现实例

具体可以参考streamload

Starrocks与传统数据库的区别?

这里其实就是OLAP与OLTP的区别:

主要强调数据的精确、事务的原子性和并发性,他们可以可以针对一条一条的数据做处理,但是,starrocks这种olap数据库,它更加强调一批一批的数据,反而对那种一条一条处理的场景有些欠缺。
starrocks主要用来做olap多维分析,实时数据分析这种,主要是面对分析场景,而sqlserver,oracle这种传统的数据库,可能更加偏向处理业务场景,不是面向分析场景

Starrocks与Clickhouse的对比

ClickHouse 简介

ClickHouse 是由俄罗斯的第一大搜索引擎 Yandex 公司开源的列存数据库。令人惊喜的是,ClickHouse 相较于很多商业 MPP 数据库,比如 Vertica,InfiniDB 有着极大的性能提升。除了 Yandex 以外,越来越多的公司开始尝试使用 ClickHouse 等列存数据库。对于一般的分析业务,结构性较强且数据变更不频繁,可以考虑将需要进行关联的表打平成宽表,放入 ClickHouse 中。
相比传统的大数据解决方案,ClickHouse 有以下的优点:

  1. 配置丰富,只依赖与 Zookeeper
  2. 线性可扩展性,可以通过添加服务器扩展集群
  3. 容错性高,不同分片间采用异步多主复制
  4. 单表性能极佳,采用向量计算,支持采样和近似计算等优化手段
  5. 功能强大支持多种表引擎

StarRocks 简介

StarRocks 是一款极速全场景 MPP 企业级数据库产品,具备水平在线扩缩容,金融级高可用,兼容 MySQL 协议和 MySQL 生态,提供全面向量化引擎与多种数据源联邦查询等重要特性。StarRocks 致力于在全场景 OLAP 业务上为用户提供统一的解决方案,适用于对性能,实时性,并发能力和灵活性有较高要求的各类应用场景。
相比于传统的大数据解决方案,StarRocks 有以下优点:

  • 不依赖于大数据生态,同时外表的联邦查询可以兼容大数据生态
  • 提供多种不同的模型,支持不同维度的数据建模
  • 支持在线弹性扩缩容,可以自动负载均衡
  • 支持高并发分析查询
  • 实时性好,支持数据秒级写入
  • 兼容 MySQL 5.7 协议和 MySQL 生态

StarRocks 与 ClickHouse 的功能对比

StarRocks 与 ClickHouse 有很多相似之处,比如说两者都可以提供极致的性能,也都不依赖于 Hadoop 生态,底层存储分片都提供了主主的复制高可用机制。但功能、性能与使用场景上也有差异。ClickHouse 在更适用与大宽表的场景,TP 的数据通过 CDC 工具的,可以考虑在 Flink 中将需要关联的表打平,以大宽表的形式写入 ClickHouse。StarRocks 对于 join 的能力更强,可以建立星型或者雪花模型应对维度数据的变更。

具体可以参考ClickHouse vs StarRocks 选型对比

编译Starrocks

clone代码

git clone https://github.com/StarRocks/starrocks.git
git checkout 3.2.6

在切换代码的时候,会报错:

Updating files: 100% (3738/3738), done.
Note: switching to '3.2.6'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by switching back to a branch.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -c with the switch command. Example:

  git switch -c <new-branch-name>

Or undo this operation with:

  git switch -

Turn off this advice by setting config variable advice.detachedHead to false

HEAD is now at 258533316e [BugFix] Fix routine load deadlock and expr dynamic partition check and subList error (backport #43839)(backport #44163)(backport #44225) (#44255) 

这里是因为,Git 完成了切换的过程,但是你当前处于 “detached HEAD” 状态。这意味着你虽然成功切换到了 3.2.6 的提交,但没有处于任何分支上。
通常来说,大多数情况下,当你使用 git checkout 切换到一个标签(tag)或提交(commit)时,就会处于这种 “detached HEAD” 状态。这是因为切换到标签或提交并不会创建一个分支,导致了你处于这种临时状态。
要解决这里的问题:

git switch -c digiwin-3.2.6

cherry-pick指定的commit代码

# 切换到main分支
git checkout main 
# 查看git log,筛选出想要的commit
# git log --grep="Fix incorrect schema id"
# 筛选出commit id为344a8216dbb04e0fef28b4740710d8fca089b2e6
# 切换到digiwin-3.2.6分支
git checkout digiwin-3.2.6
# cherry-pick 该次commit到自己的分支
git cherry-pick 344a8216dbb04e0fef28b4740710d8fca089b2e6
# 把修改后的代码提交到远程仓库
git push origin digiwin-3.2.6

使用docker编译starrocks

docker run -it -v /root/gujc/.m2:/root/.m2 \
     -v /root/gujc/starrocks:/root/starrocks \
     --name 3.2.6 -d starrocks/dev-env-centos7:3.2-latest
     
docker exec -it 094cd17c9078 /bin/bash
cd /root/starrocks && ./build.sh

Starrocks使用规范:

  1. 实时采集只能使用主键模型
  2. 数仓尽量避免使用主键模型,如果非要使用带主键的表,可以使用更新模型代替(主键模型的主键是放到内存的,主键模型多了,会占用大量BE的内存)
  3. 建议所有表的分桶数都是be个数整倍数,否则数据会不均衡

Starrocks监控指南

  1. 监控当前集群各种表的个数(主键模型,更新模型,明细模型)
  2. compaction score值的监控,大于800算异常
  3. 监控当前集群正在运行的事务数,当事务数大于500,说明写入压力过大,应该减少写入,必要时需要滚动重启BE
    mysql> show proc "/transactions/50006";
    +----------+--------+
    | State    | Number |
    +----------+--------+
    | running  | 0      |
    | finished | 11951  |
    +----------+--------+
    mysql> show proc "/transactions/50006/running";

    Starrocks常用运维命令

  4. 获取metric数据
    curl -XGET -s http://172.16.101.227:18040/metrics | grep "^starrocks_be_.*_mem_bytes\|^starrocks_be_tcmalloc_bytes_in_use"
    starrocks_be_bitmap_index_mem_bytes 0
    starrocks_be_bloom_filter_index_mem_bytes 0
    starrocks_be_chunk_allocator_mem_bytes 8086528
    starrocks_be_clone_mem_bytes 0
    starrocks_be_column_metadata_mem_bytes 65472
    starrocks_be_column_pool_mem_bytes 0
    starrocks_be_column_zonemap_index_mem_bytes 22400
    starrocks_be_compaction_mem_bytes 0
    starrocks_be_consistency_mem_bytes 0
    starrocks_be_datacache_mem_bytes 0
    starrocks_be_load_mem_bytes 0
    starrocks_be_metadata_mem_bytes 1028840
    starrocks_be_ordinal_index_mem_bytes 15256
    starrocks_be_process_mem_bytes 126387240
    starrocks_be_query_mem_bytes 0
    starrocks_be_rowset_metadata_mem_bytes 172840
    starrocks_be_schema_change_mem_bytes 0
    starrocks_be_segment_metadata_mem_bytes 21210
    starrocks_be_segment_zonemap_mem_bytes 13880
    starrocks_be_short_key_index_mem_bytes 943
    starrocks_be_storage_page_cache_mem_bytes 559296
    starrocks_be_tablet_metadata_mem_bytes 769318
    starrocks_be_tablet_schema_mem_bytes 308302
    starrocks_be_update_mem_bytes 0