0%

flinkcdc使用笔记

本文主要包括:

  • flinkcdc使用笔记

flinkcdc使用笔记

flink-cdc 3.4可以支持pipeline写数据到iceberg,这里尝试一下

安装

需要注意的是,flink-cdc3.4需要使用最低1.19版本的flink

  1. 解压flink和flink-cdc的安装包
    tar zxvf flink-1.19.2-bin-scala_2.12.tgz -C . 
    tar zxvf flink-cdc-3.4.0-bin.tar.gz -C .
  2. 下载pipeline包
    cp flink-cdc-pipeline-connector-iceberg-3.4.0.jar  flink-cdc-3.4.0/lib
    cp flink-cdc-pipeline-connector-mysql-3.4.0.jar  flink-cdc-3.4.0/lib
    cp mysql-connector-j-8.0.31.jar flink-cdc-3.4.0/lib
  3. 下载iceberg runtime包
    cp flink-sql-connector-hive-2.3.9_2.12-1.17.2.jar flink-1.19.2/lib
    cp iceberg-flink-runtime-1.17-1.4.3.jar flink-1.19.2/lib
    cp mysql-connector-j-8.0.31.jar flink-1.19.2/lib
  4. 启动flink standalone集群
    bin/start-cluster.sh
  5. 编写mysql-to-iceberg.yaml
    source:
      type: mysql
      name: MySQL Source
      hostname: 172.16.2.205
      port: 3306
      username: root
      password: root123456
      tables: test.gjc_test_iceberg_streaming_delay
      server-id: 5401-5404
    
    sink:
      type: iceberg
      name: Iceberg Sink
      catalog.properties.type: hive
      catalog.properties.uri: thrift://172.16.2.204:9083
      catalog.properties.hive-conf-dir: /root/gujc/flink-1.17.2/conf
    
    pipeline:
      name: MySQL to Iceberg Pipeline
      parallelism: 1
  6. 启动flink-cdc任务
    bin/flink-cdc.sh mysql-to-iceberg.yaml 
    数据可以正常采集进iceberg,但是,和手动写flinksql实时采集写icebebrg效果一样,没办法捕捉增量数据
    flink-cdc自动建表的语句比较简单:
    | CREATE TABLE `iceberg_catalog`.`test`.`gjc_test_iceberg_streaming_delay` (
      `id` INT NOT NULL,
      `name` VARCHAR(2147483647),
      `age` BIGINT,
      `create_time` TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL,
      CONSTRAINT `c7fdfd2e-19ca-418f-9ab2-a9458a7f32f9` PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'write.parquet.compression-codec' = 'zstd'
    )
    通过以下sql建表,可以实时读取iceberg表,但是没办法区分是insert还是update还是delete
    CREATE TABLE iceberg_catalog.iceberg.gjc_test_iceberg_streaming_delay (
      id int NOT NULL,
      name STRING ,
      age int ,
      create_time TIMESTAMP ,
      update_time TIMESTAMP,
      PRIMARY KEY(`id`) NOT ENFORCED
    ) WITH (
      'format-version' = '2',
      'write.format.default' = 'parquet',
      'write.parquet.compression-codec' = 'uncompressed',
      'write.delete.parquet.compression-codec' = 'uncompressed',
      'write.delete.mode'='merge-on-read',
      'write.merge.mode'='merge-on-read',
      'write.update.mode'='merge-on-read',
      'table.retention.max-snapshots' = '5',
      'write.metadata.delete-after-commit.enabled'='true',
      'write.metadata.previous-versions-max'='5'
    );
    通过以下sql建表,可以实时读取iceberg表,但是读取insert数据
    CREATE TABLE iceberg_catalog.iceberg.gjc_test_iceberg_streaming_delay (
      id int NOT NULL,
      name STRING ,
      age int ,
      create_time TIMESTAMP ,
      update_time TIMESTAMP,
      PRIMARY KEY(`id`) NOT ENFORCED
    ) WITH (
      'format-version' = '2',
      'write.format.default' = 'parquet',
      'write.parquet.compression-codec' = 'uncompressed',
      'write.delete.parquet.compression-codec' = 'uncompressed',
      'write.delete.mode'='merge-on-read',
      'write.merge.mode'='merge-on-read',
      'write.update.mode'='merge-on-read',
      'table.retention.max-snapshots' = '5',
      'write.metadata.delete-after-commit.enabled'='true',
      'write.metadata.previous-versions-max'='5'
    );
    通过以下sql建表,可以无法实时读取iceberg表
    CREATE TABLE iceberg_catalog.iceberg.gjc_test_iceberg_streaming_delay (
      id int NOT NULL,
      name STRING ,
      age int ,
      create_time TIMESTAMP ,
      update_time TIMESTAMP,
      PRIMARY KEY(`id`) NOT ENFORCED
    ) WITH (
      'format-version' = '2',
      'write.upsert.enabled' = 'true',
      'write.format.default' = 'parquet',
      'write.parquet.compression-codec' = 'uncompressed',
      'write.delete.parquet.compression-codec' = 'uncompressed',
      'write.delete.mode'='merge-on-read',
      'write.merge.mode'='merge-on-read',
      'write.update.mode'='merge-on-read',
      'table.retention.max-snapshots' = '5',
      'write.metadata.delete-after-commit.enabled'='true',
      'write.metadata.previous-versions-max'='5'
    );