本文主要包括:
- flinkcdc使用笔记
flinkcdc使用笔记
flink-cdc 3.4可以支持pipeline写数据到iceberg,这里尝试一下
安装
需要注意的是,flink-cdc3.4需要使用最低1.19版本的flink
- 解压flink和flink-cdc的安装包
tar zxvf flink-1.19.2-bin-scala_2.12.tgz -C . tar zxvf flink-cdc-3.4.0-bin.tar.gz -C .
- 下载pipeline包
cp flink-cdc-pipeline-connector-iceberg-3.4.0.jar flink-cdc-3.4.0/lib cp flink-cdc-pipeline-connector-mysql-3.4.0.jar flink-cdc-3.4.0/lib cp mysql-connector-j-8.0.31.jar flink-cdc-3.4.0/lib
- 下载iceberg runtime包
cp flink-sql-connector-hive-2.3.9_2.12-1.17.2.jar flink-1.19.2/lib cp iceberg-flink-runtime-1.17-1.4.3.jar flink-1.19.2/lib cp mysql-connector-j-8.0.31.jar flink-1.19.2/lib
- 启动flink standalone集群
bin/start-cluster.sh
- 编写mysql-to-iceberg.yaml
source: type: mysql name: MySQL Source hostname: 172.16.2.205 port: 3306 username: root password: root123456 tables: test.gjc_test_iceberg_streaming_delay server-id: 5401-5404 sink: type: iceberg name: Iceberg Sink catalog.properties.type: hive catalog.properties.uri: thrift://172.16.2.204:9083 catalog.properties.hive-conf-dir: /root/gujc/flink-1.17.2/conf pipeline: name: MySQL to Iceberg Pipeline parallelism: 1
- 启动flink-cdc任务
数据可以正常采集进iceberg,但是,和手动写flinksql实时采集写icebebrg效果一样,没办法捕捉增量数据bin/flink-cdc.sh mysql-to-iceberg.yaml
flink-cdc自动建表的语句比较简单:
通过以下sql建表,可以实时读取iceberg表,但是没办法区分是insert还是update还是delete| CREATE TABLE `iceberg_catalog`.`test`.`gjc_test_iceberg_streaming_delay` ( `id` INT NOT NULL, `name` VARCHAR(2147483647), `age` BIGINT, `create_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL, CONSTRAINT `c7fdfd2e-19ca-418f-9ab2-a9458a7f32f9` PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'write.parquet.compression-codec' = 'zstd' )
通过以下sql建表,可以实时读取iceberg表,但是读取insert数据CREATE TABLE iceberg_catalog.iceberg.gjc_test_iceberg_streaming_delay ( id int NOT NULL, name STRING , age int , create_time TIMESTAMP , update_time TIMESTAMP, PRIMARY KEY(`id`) NOT ENFORCED ) WITH ( 'format-version' = '2', 'write.format.default' = 'parquet', 'write.parquet.compression-codec' = 'uncompressed', 'write.delete.parquet.compression-codec' = 'uncompressed', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'table.retention.max-snapshots' = '5', 'write.metadata.delete-after-commit.enabled'='true', 'write.metadata.previous-versions-max'='5' );
通过以下sql建表,可以无法实时读取iceberg表CREATE TABLE iceberg_catalog.iceberg.gjc_test_iceberg_streaming_delay ( id int NOT NULL, name STRING , age int , create_time TIMESTAMP , update_time TIMESTAMP, PRIMARY KEY(`id`) NOT ENFORCED ) WITH ( 'format-version' = '2', 'write.format.default' = 'parquet', 'write.parquet.compression-codec' = 'uncompressed', 'write.delete.parquet.compression-codec' = 'uncompressed', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'table.retention.max-snapshots' = '5', 'write.metadata.delete-after-commit.enabled'='true', 'write.metadata.previous-versions-max'='5' );
CREATE TABLE iceberg_catalog.iceberg.gjc_test_iceberg_streaming_delay ( id int NOT NULL, name STRING , age int , create_time TIMESTAMP , update_time TIMESTAMP, PRIMARY KEY(`id`) NOT ENFORCED ) WITH ( 'format-version' = '2', 'write.upsert.enabled' = 'true', 'write.format.default' = 'parquet', 'write.parquet.compression-codec' = 'uncompressed', 'write.delete.parquet.compression-codec' = 'uncompressed', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'table.retention.max-snapshots' = '5', 'write.metadata.delete-after-commit.enabled'='true', 'write.metadata.previous-versions-max'='5' );