本文主要包括:
- Flink State Processor API 简介
- Flink State Processor API 读取Kafka Source Checkpoint
- Flink State Processor API 读取FlinkCDC Source Checkpoint(OracleCDC)
- Flink State Processor API 读取FlinkCDC Source Checkpoint(MysqlCDC)
Flink State Processor API 简介
详细简介这里跳过,具体可以看官网State Processor API
使用前提条件
想要使用State Processor API
有几个前置条件
- 需提供uid,生成checkpoint的代码里,需要指定uid,并且API需要传入该uid
- 需提供
StateDescriptor
的Name,在注册状态的时候,会指定描述Name,需把该Name传入API - 需提供状态类型,具体可以分为
Operator List State
、Operator Union List State
、Broadcast State
、Keyed State
和Window State
- 需提供状态后端类型具体可区分为
HashMapStateBackend
和EmbeddedRocksDBStateBackend
以上4个条件,都需要翻阅之前代码的源码获得。
除此之外,还需提供checkpoint的文件路径
下面以几个实际案例来说明具体如何使用
Flink State Processor API 读取Kafka Source Checkpoint
具体可以参考How to Read Kafka Source Offsets with Flink’s State Processor API
这篇文章写的很细,如何查uid,StateDescriptorName,等
具体案例:
编写flink代码消费Kafka数据
public class KafkaCheckpointTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//配置ck的状态后端
env.setStateBackend(new HashMapStateBackend());
//设置系统异常退出或人为 Cancel 掉,不删除checkpoint数据
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置checkpoint存储目录
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/checkpoint/ocean/flink-kafka-checkpoint");
env.enableCheckpointing(3000);
OffsetsInitializer offsetsInitializer = OffsetsInitializer.earliest();;
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("172.16.2.205:9092")
.setTopics("first")
.setGroupId("flink-cdc-01")
// 默认使用从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点,除非指定其他的
.setStartingOffsets(offsetsInitializer)
.setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaSource1 = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");
kafkaSource1.uid("kafka-source")
.keyBy(x-> x)
.map(new RichMapFunction<String, String>() {
private MapState<String,Integer> mapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("kafka-source-state", Types.STRING, Types.INT);
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public String map(String key) throws Exception {
if (mapState.get(key) != null) {
mapState.put(key,mapState.get(key) + 1);
} else {
mapState.put(key,1);
}
return key + ":" + mapState.get(key);
}
}).print();
env.execute("kafka-source-check");
}
}
编写代码读取状态数据
public class ReadKafkaCheckpointTest {
public static void main(String[] args) throws Exception {
final String uid = "kafka-source";
final String checkpointPath = "/tmp/flink/checkpoint/ocean/flink-kafka-checkpoint/e6c2930ef271b1522c64993669893c95/chk-2";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SavepointReader savepoint = SavepointReader.read(env, checkpointPath, new HashMapStateBackend());
DataStream listState = savepoint.readListState(
OperatorIdentifier.forUid(uid),
"SourceReaderState",
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
CloseableIterator states = listState.executeAndCollect();
while (states.hasNext()) {
byte[] s = (byte[]) states.next();
KafkaPartitionSplitSerializer serializer = new KafkaPartitionSplitSerializer();
KafkaPartitionSplit split = serializer.deserialize(serializer.getVersion(), Arrays.copyOfRange(s, 8, s.length));
System.out.println(
String.format("topic=%s, partition=%s, startingOffset=%s, stoppingOffset=%s, topicPartition=%s",
split.getTopic(), split.getPartition(),
split.getStartingOffset(), split.getStoppingOffset(), split.getTopicPartition()));
}
System.out.println("DONE");
env.execute("read the list state");
}
}
输出结果:
topic=first, partition=0, startingOffset=3, stoppingOffset=Optional.empty, topicPartition=first-0
Flink State Processor API 读取FlinkCDC Source Checkpoint(OracleCDC)
查看FlinkCDC源码查找
- 查看
OracleSource
类代码,查看以下代码:
主要是return new DebeziumSourceFunction<>( deserializer, props, null, new OracleValidator(props));
DebeziumSourceFunction
类实现功能 - 查看
DebeziumSourceFunction
类代码,找到以下代码:
可以看到,@Override public void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); this.offsetState = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)); this.schemaRecordsState = stateStore.getUnionListState( new ListStateDescriptor<>( HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO)); if (context.isRestored()) { restoreOffsetState(); restoreHistoryRecordsState(); } else { if (specificOffset != null) { byte[] serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8); LOG.info( "Consumer subtask {} starts to read from specified offset {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredOffsetState); } else { LOG.info( "Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } } }
offsetState
的StateDescriptorName
是DebeziumSourceFunction.OFFSETS_STATE_NAME
,类型为PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
state的类型为Operator Union List State
uid是自己在代码里设置的编写代码读取状态数据
读取内容:public class ReadCDCCheckpointTest { public static void main(String[] args) throws Exception { final String uid = "flinkMysqlCDC"; final String checkpointPath = "/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/8989a967962e19d665e2aa5565b303f6/chk-5"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SavepointReader savepoint = SavepointReader.read(env, checkpointPath, new HashMapStateBackend()); //oracle-cdc state DataStream<byte[]> stringDataStream = savepoint.readUnionState( OperatorIdentifier.forUid(uid), DebeziumSourceFunction.OFFSETS_STATE_NAME, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); CloseableIterator states = stringDataStream.executeAndCollect(); while (states.hasNext()) { System.out.println("#####"); System.out.println(new String((byte[]) states.next(),StandardCharsets.UTF_8)); } System.out.println("DONE"); env.execute("read the list state"); } }
{"sourcePartition":{"server":"oracle_logminer"},"sourceOffset":{"snapshot_scn":"41090057","snapshot":true,"scn":"41090057","snapshot_completed":true}}
Flink State Processor API 读取FlinkCDC Source Checkpoint(MysqlCDC)
一开始以为,mysqlcdc的StateDescriptorName
以及状态类型是和OracleCDC一样的,但是,通过翻看MysqlCDC的源码,发现MysqlCDC底层把debezium的逻辑重新写了一遍。并没有直接使用debezium的api
最终我也没找到定义状态的地方,只找到一个MySqlSourceReader
:
但是,很明显,并不是我们想找到实现@Override protected MySqlSplitState initializedState(MySqlSplit split) { if (split.isSnapshotSplit()) { return new MySqlSnapshotSplitState(split.asSnapshotSplit()); } else { return new MySqlBinlogSplitState(split.asBinlogSplit()); } }
CheckpointedFunction
接口的类。但是,这里取了一个巧,直接查看checkpoint的_metadata
可以大概看到StateDescriptorName
OracleCDC生成的metadata
我们可以看到里面包含offset-statesI`g-�{���ߋ�H� '�z����5I��QΜ�[nBO �9���J_�(�.�m �^���$mXϴ��Y� offset-states�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/8989a967962e19d665e2aa5565b303f6/chk-5/517fc1a3-d10d-4323-b3e2-ca90927e559a�history-records-states OPERATOR_STATE_DISTRIBUTION_MODEUNIONVALUE_SERIALIZERTorg.apache.flink.api.common.typeutils.base.StringSerializeoffset-states OPERATOR_STATE_DISTRIBUTION_MODEUNIONVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot%fe331f81-bbee-4613-9a57-b468d43a1b71�{"type":"CREATE","id":"\"ORCL\".\"FLINKUSER\".\"GJC_TEST_CDC\"","table":{"defaultCharsetName":null,"primaryKeyColumnNames":["ID"],"columns":[{"name":"ID","jdbcType":2,"typeName":"NUMBER","typeExpression":"NUMBER","charsetName":null,"length":0,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"NAME","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"AGE","jdbcType":2,"typeName":"NUMBER","typeExpression":"NUMBER","charsetName":null,"length":0,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]}]},"comment":null}�{"sourcePartition":{"server":"oracle_logminer"},"sourceOffset":{"snapshot_scn":"41122474","snapshot":true,"scn":"41122474","snapshot_completed":true}}��sr8org.apache.flink.runtime.checkdiscardFailedZdiscardFinishedZdiscardSubsumedZdiscardSuspendedZforcedZ unclaimedLcheckpointTypet2Lorg/apache/flink/runtime/checkpoint/SnapshotType;xpsr2org.apache.flink.runtime.checkpoint.CheckpointType��O�:LnametLjava/lang/String;LsharingFilesStrategytGLorg/apache/flink/runtime/checkpoint/SnapshotType$SharingFilesStrategy;xpt Checkpoint~rEorg.apache.flink.runtime.checkpoint.SnapshotType$SharingFilesStrategyxrjava.lang.EnumxptFORWARD_BACKWARD%
MysqlCDC生成的metadata
我这里把里面能看清的都试了一遍。最终确定是I`g- �{���ߋ�H� '�z����5I��QΜ�[nBO � 9���J_�(�.�m �^���$mXϴ��Y� � �F'|6gw��1�D6(w � c246277c36670577fee931bd44362877~rtest.gjc_test_cdctest.gjc_test_cdc:0e`test`.`gjc_test_cdc`test.gjc_test_cdc:0ROW<`id` INT NOT NULL> aced000570 aced000570test.gjc_test_cdc:0i{"ts_sec":"0","file":"mysql-bin.000004","pos":"39483","kind":"SPECIFIC","gtids":"","row":"0","event":"0"}test.gjc_test_cdc {"type":"CREATE","id":"\"test\".\"gjc_test_cdc\"","table":{"defaultCharsetName":"latin1","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":10,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"name","jdbcType":12,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"latin1","length":23,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"age","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":11,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CREA_TIME","jdbcType":91,"typeName":"DATE","typeExpression":"DATE","charsetName":null,"position":4,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]}]},"comment":null} SourceReaderState��file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/4463e720-fffe-4e2f-9a9a-3d296552a2ae�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot�� binlog-split�{"transaction_id":null,"ts_sec":"0","file":"mysql-bin.000004","pos":"39483","kind":"SPECIFIC","row":"0","event":"0","server_id":"1"}a{"ts_sec":"0","file":"","pos":"-9223372036854775808","kind":"NON_STOPPING","row":"0","event":"0"}test.gjc_test_cdctest.gjc_test_cdc:0 aced000570 aced000570i{"ts_sec":"0","file":"mysql-bin.000004","pos":"39483","kind":"SPECIFIC","gtids":"","row":"0","event":"0"}test.gjc_test_cdc {"type":"CREATE","id":"\"test\".\"gjc_test_cdc\"","table":{"defaultCharsetName":"latin1","primaryKeyColumnNames":["id"],"columns":[{"name":"id","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":10,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"name","jdbcType":12,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"latin1","length":23,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"age","jdbcType":4,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":11,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CREA_TIME","jdbcType":91,"typeName":"DATE","typeExpression":"DATE","charsetName":null,"position":4,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]}]},"comment":null}SourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/12b2bb72-238a-4106-a1c2-357a67d703ad�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/f35eca1d-9813-4b2d-9d98-2a568ce700cb�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/ad6ad728-9dbd-47ec-ac4f-2e90c4fa76f6�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/ba819775-040d-4d44-abf1-29467f391f0c�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/48491bc0-bbd9-427b-85b3-93a8dbdc6383�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/07ae75fb-08d6-42b6-a6b5-cfb7ca557485�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/4e11ad56-a190-48a0-ba8f-bad1f17138bb�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshoSourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/76da0e93-6b5d-4990-8af1-444bcee6bc40�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot SourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/fb9a89f2-d480-46e5-831a-d455bb39b245�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot SourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/d4c77886-f206-4112-b3e8-1cefe40c657b�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot SourceReaderState�file:/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/397c8203625d686c62fd6b916e602acd/chk-11/c9bc3ab1-e46e-4fff-b3fb-b49b75e191f7�SourceReaderState OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshot��sr8odiscardFailedZdiscardFinishedZdiscardSubsumedZdiscardSuspendedZforcedZdZunclaimedLcheckpointTypet2Lorg/apache/flink/runtime/checkpoint/SnapshotType;xpsr2org.apache.flink.runtime.checkpoint.CheckpointType��O�:LnametLjava/lang/String;LsharingFilesStrategytGLorg/apache/flink/runtime/checkpoint/SnapshotType$SharingFilesStrategy;xpt Checkpoint~rEorg.apache.flink.runtime.checkpoint.SnapshotType$SharingFilesStrategyxrjava.lang.EnumxptFORWARD_BACKWARD%
SourceReaderState
,可以看到,是和KafkaSource的Name一样的。不知道是不是直接实现了Flink的通用的类。但是,状态类型是Operator List State
和OracleCDC
不一样
代码实现如下:
看了一下MysqlCDC的源码,结构和KafkaSource的结构很相似。但是,我确实没找到How to Read Kafka Source Offsets with Flink’s State Processor API 这个文档里所说的public class ReadCDCCheckpointTest { public static void main(String[] args) throws Exception { final String uid = "flinkMysqlCDC"; final String checkpointPath = "/tmp/flink/checkpoint/ocean/xunxing-oracle-SFAA_T/8989a967962e19d665e2aa5565b303f6/chk-5"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SavepointReader savepoint = SavepointReader.read(env, checkpointPath, new HashMapStateBackend()); DataStream<byte[]> stringDataStream = savepoint.readListState( OperatorIdentifier.forUid(uid), "SourceReaderState", PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); CloseableIterator states = stringDataStream.executeAndCollect(); while (states.hasNext()) { byte[] s = (byte[]) states.next(); MySqlSplitSerializer serializer = new MySqlSplitSerializer(); MySqlSplit split = serializer.deserialize(serializer.getVersion(), Arrays.copyOfRange(s, 8, s.length)); System.out.println(split.asBinlogSplit().getStartingOffset()); } System.out.println("DONE"); env.execute("read the list state"); } }
SourceOperator
类具体是怎么引用的