0%

Flink-State-Processor-API使用记录

本文主要包括:

  • Flink State Processor API 简介
  • Flink State Processor API 读取Kafka Source Checkpoint
  • Flink State Processor API 读取FlinkCDC Source Checkpoint(OracleCDC)
  • Flink State Processor API 读取FlinkCDC Source Checkpoint(MysqlCDC)

详细简介这里跳过,具体可以看官网State Processor API

使用前提条件

想要使用State Processor API有几个前置条件

  1. 需提供uid,生成checkpoint的代码里,需要指定uid,并且API需要传入该uid
  2. 需提供StateDescriptor的Name,在注册状态的时候,会指定描述Name,需把该Name传入API
  3. 需提供状态类型,具体可以分为Operator List StateOperator Union List StateBroadcast StateKeyed StateWindow State
  4. 需提供状态后端类型具体可区分为HashMapStateBackendEmbeddedRocksDBStateBackend

以上4个条件,都需要翻阅之前代码的源码获得。
除此之外,还需提供checkpoint的文件路径

下面以几个实际案例来说明具体如何使用

具体可以参考How to Read Kafka Source Offsets with Flink’s State Processor API
这篇文章写的很细,如何查uid,StateDescriptorName,等
具体案例:

编写flink代码消费Kafka数据

public class KafkaCheckpointTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //配置ck的状态后端
        env.setStateBackend(new HashMapStateBackend());
        //设置系统异常退出或人为 Cancel 掉,不删除checkpoint数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //设置checkpoint存储目录
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/checkpoint/ocean/flink-kafka-checkpoint");
        env.enableCheckpointing(3000);

        OffsetsInitializer offsetsInitializer = OffsetsInitializer.earliest();;

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("172.16.2.205:9092")
                .setTopics("first")
                .setGroupId("flink-cdc-01")
                // 默认使用从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点,除非指定其他的
                .setStartingOffsets(offsetsInitializer)
                .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> kafkaSource1 = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");
        kafkaSource1.uid("kafka-source")
                .keyBy(x-> x)
                .map(new RichMapFunction<String, String>() {
                    private MapState<String,Integer> mapState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("kafka-source-state", Types.STRING, Types.INT);
                        mapState = getRuntimeContext().getMapState(mapStateDescriptor);
                    }

                    @Override
                    public String map(String key) throws Exception {
                        if (mapState.get(key) != null) {
                            mapState.put(key,mapState.get(key) + 1);
                        } else {
                            mapState.put(key,1);
                        }
                        return key + ":" + mapState.get(key);
                    }
                }).print();
        env.execute("kafka-source-check");

    }
}

编写代码读取状态数据

public class ReadKafkaCheckpointTest {
    public static void main(String[] args) throws Exception {
        final String uid = "kafka-source";
        final String checkpointPath = "/tmp/flink/checkpoint/ocean/flink-kafka-checkpoint/e6c2930ef271b1522c64993669893c95/chk-2";
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SavepointReader savepoint = SavepointReader.read(env, checkpointPath, new HashMapStateBackend());
        DataStream listState = savepoint.readListState(
                OperatorIdentifier.forUid(uid),
                "SourceReaderState",
                PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
        CloseableIterator states = listState.executeAndCollect();
        while (states.hasNext()) {
            byte[] s = (byte[]) states.next();
            KafkaPartitionSplitSerializer serializer = new KafkaPartitionSplitSerializer();
            KafkaPartitionSplit split = serializer.deserialize(serializer.getVersion(), Arrays.copyOfRange(s, 8, s.length));
            System.out.println(
                    String.format("topic=%s, partition=%s, startingOffset=%s, stoppingOffset=%s, topicPartition=%s",
                            split.getTopic(), split.getPartition(),
                            split.getStartingOffset(), split.getStoppingOffset(), split.getTopicPartition()));
        }

        System.out.println("DONE");
        env.execute("read the list state");
    }
}

输出结果:

topic=first, partition=0, startingOffset=3, stoppingOffset=Optional.empty, topicPartition=first-0

查看FlinkCDC源码查找

  1. 查看OracleSource类代码,查看以下代码:
    return new DebeziumSourceFunction<>(
    deserializer, props, null, new OracleValidator(props));
    主要是DebeziumSourceFunction类实现功能
  2. 查看DebeziumSourceFunction类代码,找到以下代码:
    @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            OperatorStateStore stateStore = context.getOperatorStateStore();
            this.offsetState =
                    stateStore.getUnionListState(
                            new ListStateDescriptor<>(
                                    OFFSETS_STATE_NAME,
                                    PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
            this.schemaRecordsState =
                    stateStore.getUnionListState(
                            new ListStateDescriptor<>(
                                    HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO));
    
            if (context.isRestored()) {
                restoreOffsetState();
                restoreHistoryRecordsState();
            } else {
                if (specificOffset != null) {
                    byte[] serializedOffset =
                            DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
                    restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
                    LOG.info(
                            "Consumer subtask {} starts to read from specified offset {}.",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            restoredOffsetState);
                } else {
                    LOG.info(
                            "Consumer subtask {} has no restore state.",
                            getRuntimeContext().getIndexOfThisSubtask());
                }
            }
        }
    可以看到,offsetStateStateDescriptorNameDebeziumSourceFunction.OFFSETS_STATE_NAME,类型为PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
    state的类型为Operator Union List State
    uid是自己在代码里设置的

    编写代码读取状态数据

    public class ReadCDCCheckpointTest {
        public static void main(String[] args) throws Exception {
            final String uid = "flinkMysqlCDC";
            final String checkpointPath = "/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/8989a967962e19d665e2aa5565b303f6/chk-5";
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            SavepointReader savepoint = SavepointReader.read(env, checkpointPath, new HashMapStateBackend());
            //oracle-cdc state
            DataStream<byte[]> stringDataStream = savepoint.readUnionState(
                    OperatorIdentifier.forUid(uid),
                    DebeziumSourceFunction.OFFSETS_STATE_NAME,
                    PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
            CloseableIterator states = stringDataStream.executeAndCollect();
            while (states.hasNext()) {
                System.out.println("#####");
                System.out.println(new String((byte[]) states.next(),StandardCharsets.UTF_8));
            }
    
            System.out.println("DONE");
            env.execute("read the list state");
        }
    }
    读取内容:
    {"sourcePartition":{"server":"oracle_logminer"},"sourceOffset":{"snapshot_scn":"41090057","snapshot":true,"scn":"41090057","snapshot_completed":true}}
    一开始以为,mysqlcdc的StateDescriptorName以及状态类型是和OracleCDC一样的,但是,通过翻看MysqlCDC的源码,发现MysqlCDC底层把debezium的逻辑重新写了一遍。并没有直接使用debezium的api
    最终我也没找到定义状态的地方,只找到一个MySqlSourceReader
    @Override
        protected MySqlSplitState initializedState(MySqlSplit split) {
            if (split.isSnapshotSplit()) {
                return new MySqlSnapshotSplitState(split.asSnapshotSplit());
            } else {
                return new MySqlBinlogSplitState(split.asBinlogSplit());
            }
        }
    但是,很明显,并不是我们想找到实现CheckpointedFunction接口的类。但是,这里取了一个巧,直接查看checkpoint的_metadata可以大概看到StateDescriptorName
    OracleCDC生成的metadata
    I`g-�{���ߋ�H�	'�z����5I��QΜ�[nBO
                                      �9���J_�(œ�.�m
                                                    �^���$mXϴ��Y�
    offset-states�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/8989a967962e19d665e2aa5565b303f6/chk-5/517fc1a3-d10d-4323-b3e2-ca90927e559a�history-records-states OPERATOR_STATE_DISTRIBUTION_MODEUNIONVALUE_SERIALIZERTorg.apache.flink.api.common.typeutils.base.StringSerializeoffset-states OPERATOR_STATE_DISTRIBUTION_MODEUNIONVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot%fe331f81-bbee-4613-9a57-b468d43a1b71�{"type":"CREATE","id":"\"ORCL\".\"FLINKUSER\".\"GJC_TEST_CDC\"","table":{"defaultCharsetName":null,"primaryKeyColumnNames":["ID"],"columns":[{"name":"ID","jdbcType":2,"typeName":"NUMBER","typeExpression":"NUMBER","charsetName":null,"length":0,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"NAME","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"AGE","jdbcType":2,"typeName":"NUMBER","typeExpression":"NUMBER","charsetName":null,"length":0,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]}]},"comment":null}�{"sourcePartition":{"server":"oracle_logminer"},"sourceOffset":{"snapshot_scn":"41122474","snapshot":true,"scn":"41122474","snapshot_completed":true}}��sr8org.apache.flink.runtime.checkdiscardFailedZdiscardFinishedZdiscardSubsumedZdiscardSuspendedZforcedZ	unclaimedLcheckpointTypet2Lorg/apache/flink/runtime/checkpoint/SnapshotType;xpsr2org.apache.flink.runtime.checkpoint.CheckpointType��O�:LnametLjava/lang/String;LsharingFilesStrategytGLorg/apache/flink/runtime/checkpoint/SnapshotType$SharingFilesStrategy;xpt
    Checkpoint~rEorg.apache.flink.runtime.checkpoint.SnapshotType$SharingFilesStrategyxrjava.lang.EnumxptFORWARD_BACKWARD%
    我们可以看到里面包含offset-states
    MysqlCDC生成的metadata
    I`g-
        �{���ߋ�H�	'�z����5I��QΜ�[nBO
                                      �
    
    
    9���J_�(œ�.�m
                 �^���$mXϴ��Y�
                               �
    
    
    �F'|6gw��1�D6(w
                   � c246277c36670577fee931bd44362877~rtest.gjc_test_cdctest.gjc_test_cdc:0e`test`.`gjc_test_cdc`test.gjc_test_cdc:0ROW<`id` INT NOT NULL>
    aced000570
    aced000570test.gjc_test_cdc:0i{"ts_sec":"0","file":"mysql-bin.000004","pos":"39483","kind":"SPECIFIC","gtids":"","row":"0","event":"0"}test.gjc_test_cdc {"type":"CREATE","id":"\"test\".\"gjc_test_cdc\"","table":{"defaultCharsetName":"latin1","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":10,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"name","jdbcType":12,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"latin1","length":23,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"age","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":11,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CREA_TIME","jdbcType":91,"typeName":"DATE","typeExpression":"DATE","charsetName":null,"position":4,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]}]},"comment":null}
                                                                                             SourceReaderState��file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/4463e720-fffe-4e2f-9a9a-3d296552a2ae�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot��
                       binlog-split�{"transaction_id":null,"ts_sec":"0","file":"mysql-bin.000004","pos":"39483","kind":"SPECIFIC","row":"0","event":"0","server_id":"1"}a{"ts_sec":"0","file":"","pos":"-9223372036854775808","kind":"NON_STOPPING","row":"0","event":"0"}test.gjc_test_cdctest.gjc_test_cdc:0
    aced000570
    aced000570i{"ts_sec":"0","file":"mysql-bin.000004","pos":"39483","kind":"SPECIFIC","gtids":"","row":"0","event":"0"}test.gjc_test_cdc {"type":"CREATE","id":"\"test\".\"gjc_test_cdc\"","table":{"defaultCharsetName":"latin1","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":10,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"name","jdbcType":12,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"latin1","length":23,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"age","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":11,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CREA_TIME","jdbcType":91,"typeName":"DATE","typeExpression":"DATE","charsetName":null,"position":4,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]}]},"comment":null}SourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/12b2bb72-238a-4106-a1c2-357a67d703ad�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/f35eca1d-9813-4b2d-9d98-2a568ce700cb�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/ad6ad728-9dbd-47ec-ac4f-2e90c4fa76f6�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/ba819775-040d-4d44-abf1-29467f391f0c�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/48491bc0-bbd9-427b-85b3-93a8dbdc6383�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/07ae75fb-08d6-42b6-a6b5-cfb7ca557485�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/4e11ad56-a190-48a0-ba8f-bad1f17138bb�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshoSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/76da0e93-6b5d-4990-8af1-444bcee6bc40�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot	SourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/fb9a89f2-d480-46e5-831a-d455bb39b245�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot
    SourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/d4c77886-f206-4112-b3e8-1cefe40c657b�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot
                                                                       SourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/c9bc3ab1-e46e-4fff-b3fb-b49b75e191f7�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot��sr8odiscardFailedZdiscardFinishedZdiscardSubsumedZdiscardSuspendedZforcedZdZunclaimedLcheckpointTypet2Lorg/apache/flink/runtime/checkpoint/SnapshotType;xpsr2org.apache.flink.runtime.checkpoint.CheckpointType��O�:LnametLjava/lang/String;LsharingFilesStrategytGLorg/apache/flink/runtime/checkpoint/SnapshotType$SharingFilesStrategy;xpt
    Checkpoint~rEorg.apache.flink.runtime.checkpoint.SnapshotType$SharingFilesStrategyxrjava.lang.EnumxptFORWARD_BACKWARD%
    我这里把里面能看清的都试了一遍。最终确定是SourceReaderState,可以看到,是和KafkaSource的Name一样的。不知道是不是直接实现了Flink的通用的类。但是,状态类型是Operator List StateOracleCDC不一样
    代码实现如下:
    public class ReadCDCCheckpointTest {
        public static void main(String[] args) throws Exception {
            final String uid = "flinkMysqlCDC";
            final String checkpointPath = "/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/8989a967962e19d665e2aa5565b303f6/chk-5";
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            SavepointReader savepoint = SavepointReader.read(env, checkpointPath, new HashMapStateBackend());
    
            DataStream<byte[]> stringDataStream = savepoint.readListState(
                    OperatorIdentifier.forUid(uid),
                    "SourceReaderState",
                    PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
            CloseableIterator states = stringDataStream.executeAndCollect();
            while (states.hasNext()) {
                byte[] s = (byte[]) states.next();
                MySqlSplitSerializer serializer = new MySqlSplitSerializer();
                MySqlSplit split = serializer.deserialize(serializer.getVersion(), Arrays.copyOfRange(s, 8, s.length));
                System.out.println(split.asBinlogSplit().getStartingOffset());
            }
            System.out.println("DONE");
            env.execute("read the list state");
        }
    }
    看了一下MysqlCDC的源码,结构和KafkaSource的结构很相似。但是,我确实没找到How to Read Kafka Source Offsets with Flink’s State Processor API 这个文档里所说的SourceOperator类具体是怎么引用的