0%

FlinkCDC与常用CDC对比

本文主要包括:

  • CDC概述
  • CDC的种类

一、CDC概述

CDC在广义的概念上,Change Data Capture变更数据获取的技术,我们都可以称为 CDC。
CDC技术常用于:

  • 数据同步,用于备份,容灾
  • 数据分发,一个数据源分发给多个下游系统
  • 数据采集,面向数据仓库/数据湖的 ETL 数据集成,是非常重要的数据源

二、CDC的种类

目前主流的实现方案可以分为两种:
1、基于查询的CDC

  • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取查询的结果
  • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更
  • 不保障实时性,基于离线调度有查询延迟

2、基于日志的CDC

  • 实时消费日志,流处理,例如MYSQL的BINLOG完整记录库里面的变更,可以把BINLOG当作流的数据源
  • 保障数据一致性,因为BINLOG所有的历史明细都可以获得
  • 提供实时数据,因为提供是流式的消费方式,所以实时性有爆炸

常见的开源CDC工具

DataX Sqoop Kettle Canal Maxwell Flink CDC Debezium Oracle Goldengate
CDC机制 查询 查询 查询 日志 日志 日志 日志 日志
增量同步 × × ×
断点续传 × × ×
全量同步 ×
架构 单机 分布式 分布式 单机 单机 分布式 单机 分布式
生态 ☆☆☆ ☆☆ ☆☆ ☆☆☆ ☆☆☆☆☆ ☆☆☆ ☆☆☆

实际演示Canal、MaxWell、FlinkCDC比对

三、FlinkCDC使用案例

使用前提

1、开启binlog,MySQL级别为 ROW
2、导入cdc依赖

<!--        Flink CDC-->

<dependency>

    <groupId>com.alibaba.ververica</groupId>

    <artifactId>flink-connector-mysql-cdc</artifactId>

    <version>1.2.0</version>

</dependency>

使用DEMO

1、DataStream API

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;



/**
 * @author mt
 * @create 2021-10-26 20:13
 */

public class FlinkTestCDC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DebeziumSourceFunction<String> build = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .databaseList("test")
                .tableList("test.sensor")
                .password("root")
                .username("root")
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();


        DataStreamSource<String> stringDataStreamSource = env.addSource(build);
        stringDataStreamSource.print();
        env.execute();
    }
}

2、Table API

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;


/**
 * @author mt
 * @create 2021-09-30 11:07
 */
public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        tableEnvironment.executeSql(
                "CREATE TABLE sensor_test ( " +
                        " id STRING NOT NULL, " +
                        " ts BIGINT, " +
                        " vc INTEGER " +
                        ") WITH ( " +
                        " 'connector' = 'mysql-cdc', " +
                        " 'hostname' = 'hadoop102', " +
                        " 'port' = '3306', " +
                        " 'username' = 'root', " +
                        " 'password' = 'root', " +
                        " 'database-name' = 'test', " +
                        " 'table-name' = 'sensor' " +
                        ") ");
        Table table = tableEnvironment.sqlQuery("select * from sensor_test");
        tableEnvironment.toRetractStream(table, Row.class).print();
        env.execute();
    }
}

目前最新版本Flink CDC支持的连接器
DataX运行原理

四、MaxWell配置文件

# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka_topic=ggggg_cccc_aaaa
# mysql login info
host=hadoop102
user=maxwell
password=123456

#同步历史数据时需要配置如下的
client_id=maxwell_2
#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
#producer=kafka

# set the log level.  note that you can configure things further in log4j2.xml
#log_level=DEBUG # [DEBUG, INFO, WARN, ERROR]

# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
#env_config_prefix=MAXWELL_

#     *** mysql ***
# mysql host to connect to
#host=hostname
# mysql port to connect to
#port=3306
# mysql user to connect as.  This user must have REPLICATION SLAVE permissions,
# as well as full access to the `maxwell` (or schema_database) database
#user=maxwell
# mysql password
#password=maxwell



# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello
# name of the mysql database where maxwell keeps its own state
#schema_database=maxwell
# whether to use GTID or not for positioning
#gtid_mode=true
# SSL/TLS options
# To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts:
#   -Djavax.net.ssl.trustStore=<truststore> -Djavax.net.ssl.trustStorePassword=<password>
# or import the MySQL cert into the global Java cacerts.
# MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY
#

# turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified
#ssl=DISABLED
# for binlog-connector
#replication_ssl=DISABLED
# for the schema-capture connection, if used
#schema_ssl=DISABLED


# maxwell can optionally replicate from a different server than where it stores
# schema and binlog position info.  Specify that different server here:

#replication_host=other
#replication_user=username
#replication_password=password
#replication_port=3306


# This may be useful when using MaxScale's binlog mirroring host.
# Specifies that Maxwell should capture schema from a different server than
# it replicates from:
#schema_host=other
#schema_user=username
#schema_password=password
#schema_port=3306
#       *** output format ***
# records include binlog position (default false)
#output_binlog_position=true
# records include a gtid string (default false)
#output_gtid_position=true
# records include fields with null values (default true).  If this is false,
# fields where the value is null will be omitted entirely from output.
#output_nulls=true
# records include server_id (default false)
#output_server_id=true
# records include thread_id (default false)
#output_thread_id=true
# records include schema_id (default false)
#output_schema_id=true
# records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false)
#output_row_query=true
# DML records include list of values that make up a row's primary key (default false)
#output_primary_keys=true
# DML records include list of columns that make up a row's primary key (default false)
#output_primary_key_columns=true
# records include commit and xid (default true)
#output_commit_info=true
# This controls whether maxwell will output JSON information containing
# DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false)
# See also: ddl_kafka_topic
#output_ddl=true
#       *** kafka ***
# list of kafka brokers
#kafka.bootstrap.servers=hosta:9092,hostb:9092
# kafka topic to write to
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# in the latter case 'database' and 'table' will be replaced with the values for the row being processed
#kafka_topic=maxwell
# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl
# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]


# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]


# extra kafka options.  Anything prefixed "kafka." will get
# passed directly into the kafka-producer's config.

# a few defaults.
# These are 0.11-specific. They may or may not work with other versions.
kafka.compression.type=snappy
kafka.retries=0
kafka.acks=1
#kafka.batch.size=16384


# kafka+SSL example
# kafka.security.protocol=SSL
# kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
# kafka.ssl.truststore.password=test1234
# kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
# kafka.ssl.keystore.password=test1234
# kafka.ssl.key.password=test1234#

# controls a heuristic check that maxwell may use to detect messages that
# we never heard back from.  The heuristic check looks for "stuck" messages, and
# will timeout maxwell after this many milliseconds.
#
# See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java
# if you really want to get into it.
#producer_ack_timeout=120000 # default 0


#           *** partitioning ***

# What part of the data do we partition by?
#producer_partition_by=database # [database, table, primary_key, transaction_id, column]


# specify what fields to partition by when using producer_partition_by=column
# column separated list.
#producer_partition_columns=id,foo,bar


# when using producer_partition_by=column, partition by this when
# the specified column(s) don't exist.
#producer_partition_by_fallback=database


#            *** kinesis ***

#kinesis_stream=maxwell
# AWS places a 256 unicode character limit on the max key length of a record
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
#
# Setting this option to true enables hashing the key with the md5 algorithm
# before we send it to kinesis so all the keys work within the key size limit.
# Values: true, false
# Default: false
#kinesis_md5_keys=true
#            *** sqs ***
#sqs_queue_uri=aws_sqs_queue_uri
# The sqs producer will need aws credentials configured in the default
# root folder and file format. Please check below link on how to do it.
# http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html
#            *** pub/sub ***
#pubsub_project_id=maxwell
#pubsub_topic=maxwell
#ddl_pubsub_topic=maxwell_ddl

#            *** rabbit-mq ***
#rabbitmq_host=rabbitmq_hostname
#rabbitmq_port=5672
#rabbitmq_user=guest
#rabbitmq_pass=guest
#rabbitmq_virtual_host=/
#rabbitmq_exchange=maxwell
#rabbitmq_exchange_type=fanout
#rabbitmq_exchange_durable=false
#rabbitmq_exchange_autodelete=false
#rabbitmq_routing_key_template=%db%.%table%
#rabbitmq_message_persistent=false
#rabbitmq_declare_exchange=true

#           *** redis ***
#redis_host=redis_host
#redis_port=6379
#redis_auth=redis_auth
#redis_database=0
# name of pubsub/list/whatever key to publish to
#redis_key=maxwell

# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_pub_channel=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
#redis_list_key=maxwell
# this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table}
# Valid values for redis_type = pubsub|lpush. Defaults to pubsub
#redis_type=pubsub

#           *** custom producer ***
# the fully qualified class name for custom ProducerFactory
# see the following link for more details.
# http://maxwells-daemon.io/producers/#custom-producer
#custom_producer.factory=
# custom producer properties can be configured using the custom_producer.* property namespace
#custom_producer.custom_prop=foo

#          *** filtering ***
# filter rows out of Maxwell's output.  Command separated list of filter-rules, evaluated in sequence.
# A filter rule is:
#  <type> ":" <db> "." <tbl> [ "." <col> "=" <col_val> ]
#  type    ::= [ "include" | "exclude" | "blacklist" ]
#  db      ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#  col_val ::= "column_name"
#  tbl     ::= [ "/regexp/" | "string" | "`string`" | "*" ]
#
# See http://maxwells-daemon.io/filtering for more details
#
#filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match"


# javascript filter
# maxwell can run a bit of javascript for each row if you need very custom filtering/data munging.
# See http://maxwells-daemon.io/filtering/#javascript_filters for more details
#
#javascript=/path/to/javascript_filter_file
#       *** encryption ***
# Encryption mode. Possible values are none, data, and all. (default none)
#encrypt=none

# Specify the secret key to be used
#secret_key=RandomInitVector
#       *** monitoring ***
# Maxwell collects metrics via dropwizard. These can be exposed through the
# base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog.
# Options: [jmx, slf4j, http, datadog]
# Supplying multiple is allowed.
#metrics_type=jmx,slf4j
# The prefix maxwell will apply to all metrics
#metrics_prefix=MaxwellMetrics # default MaxwellMetrics
# Enable (dropwizard) JVM metrics, default false
#metrics_jvm=true
# When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds
#metrics_slf4j_interval=60
# When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to.
#http_port=8080
# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
#http_path_prefix=/some/path/
# ** The following are Datadog specific. **
# When metrics_type includes datadog this is the way metrics will be reported.
# Options: [udp, http]
# Supplying multiple is not allowed.
#metrics_datadog_type=udp
# datadog tags that should be supplied
#metrics_datadog_tags=tag1:value1,tag2:value2

# The frequency metrics are pushed to datadog, in seconds
#metrics_datadog_interval=60
# required if metrics_datadog_type = http
#metrics_datadog_apikey=API_KEY
# required if metrics_datadog_type = udp
#metrics_datadog_host=localhost # default localhost
#metrics_datadog_port=8125 # default 8125
# Maxwell exposes http diagnostic endpoint to check below in parallel:
# 1. binlog replication lag
# 2. producer (currently kafka) lag
# To enable Maxwell diagnostic
#http_diagnostic=true # default false
# Diagnostic check timeout in milliseconds, required if diagnostic = true
#http_diagnostic_timeout=10000 # default 10000
#    *** misc ***
# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic.  Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions).  sync mode is safer but you
# have to stop replication.
#bootstrapper=async [sync, async, none]
# output filename when using the "file" producer
#output_file=/path/to/file

五、Cancal配置文件

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=hadoop102:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=


# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==


# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch


# mq config  
canal.mq.topic=kkkk_ffff_aaaaa
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################