0%

Debezium学习笔记

本文主要包括:

  • Debezium简介
  • 使用docker启动Debezium Connect
  • 测试mysql-connector

Debezium简介

使用docker启动Debezium Connect

# 拉取镜像
docker pull debezium/connect:2.4
# 启动docker
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses -e BOOTSTRAP_SERVERS=golden-02:9092 debezium/connect:2.4

测试mysql-connector

执行以下命令

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "golden-02", "database.port": "3306", "database.user": "root", "database.password": "Gjc123!@#", "database.server.id": "184054", "database.server.name": "dbserver1","database.include.list":"test","table.include.list":"gjc_test_cdc","topic.prefix": "dbz_test", "database.history.kafka.bootstrap.servers": "golden-02:9092"} }'

报如下错误:

2023-11-26 08:43:40,144 ERROR  ||  The 'schema.history.internal.kafka.topic' value is invalid: A value is required   [io.debezium.storage.kafka.history.KafkaSchemaHistory]
2023-11-26 08:43:40,144 ERROR  ||  The 'schema.history.internal.kafka.bootstrap.servers' value is invalid: A value is required   [io.debezium.storage.kafka.history.KafkaSchemaHistory]
2023-11-26 08:43:40,144 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaSchemaHistory; check the logs for details
        at io.debezium.storage.kafka.history.KafkaSchemaHistory.configure(KafkaSchemaHistory.java:208)
        at io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.getSchemaHistory(HistorizedRelationalDatabaseConnectorConfig.java:137)
        at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:49)
        at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:82)
        at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:98)
        at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:142)
        at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
2023-11-26 08:43:40,144 INFO   ||  Stopping down connector   [io.debezium.connector.common.BaseSourceTask]
2023-11-26 08:43:40,147 INFO   ||  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
2023-11-26 08:43:40,147 INFO   ||  [Producer clientId=connector-producer-inventory-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms.   [org.apache.kafka.clients.producer.KafkaProducer]
2023-11-26 08:43:40,149 INFO   ||  Metrics scheduler closed   [org.apache.kafka.common.metrics.Metrics]
2023-11-26 08:43:40,149 INFO   ||  Closing reporter org.apache.kafka.common.metrics.JmxReporter   [org.apache.kafka.common.metrics.Metrics]
2023-11-26 08:43:40,149 INFO   ||  Metrics reporters closed   [org.apache.kafka.common.metrics.Metrics]
2023-11-26 08:43:40,149 INFO   ||  App info kafka.producer for connector-producer-inventory-connector-0 unregistered   [org.apache.kafka.common.utils.AppInfoParser]

这里报错很明显,是缺少schema.history.internal.kafka.topicschema.history.internal.kafka.bootstrap.servers,修改代码,再次执行:

问题是,虽然是报错了,但是,查询connector是有这个connecor的:

# 查看集群中connector列表
 curl -i http://localhost:8083/connectors
 # 删除某个connector:
 curl -X DELETE http://localhost:8083/connectors/inventory-connector

问题一:table.include.list的格式是databaseName.tableName

我这里一开始没有按照这个格式,直接制定了tableName,即:

{
	"name": "inventory-connector",
	"config": {
		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
		"tasks.max": "1",
		"database.hostname": "golden-02",
		"database.port": "3306",
		"database.user": "root",
		"database.password": "Gjc123!@#",
		"database.server.id": "184054",
		"database.server.name": "dbserver1",
		"database.include.list": "test",
		"table.include.list": "gjc_test_cdc",
		"topic.prefix": "dbz_test",
		"schema.history.internal.kafka.topic": "dbz_schema_history",
		"schema.history.internal.kafka.bootstrap.servers": "golden-02:9092"
	}
}

这时候,kafka的topic只有:

bin/kafka-topics.sh --bootstrap-server golden-02:9092 --list
__consumer_offsets
dbz_schema_history
dbz_test
my_connect_configs
my_connect_offsets
my_connect_statuses

可以看到,并没有我们要采集表的topic,让人不解的是,这里直接删除这个connector,然后修改成正确的格式以后,表的topic还是没有正常创建:

# 删除inventory-connector 
 curl -X DELETE http://localhost:8083/connectors/inventory-connector
 # 重新创建
  curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "golden-02", "database.port": "3306", "database.user": "root", "database.password": "Gjc123!@#", "database.server.id": "184054", "database.server.name": "dbserver1","database.include.list":"test","table.include.list":"test.gjc_test_cdc","topic.prefix": "dbz_test","schema.history.internal.kafka.topic":"dbz_schema_history","schema.history.internal.kafka.bootstrap.servers":"golden-02:9092"} }'

这是因为然连接器的进程删除了,但是它所对应的 Kafka 偏移(Offset)仍然保存在 connect-offsets 主题中。当我们新建一个连接器,却使用了原来的名字,则它会从这个主题中找到原来的偏移并接续读取数据。
这其实也是 Kafka Connect 实现高可用的机制——因为 Kafka 主题本身是高可用的,所以即便一个连接器失效,我们可以立即以同样名字、同样配置起一个新的连接器,自动接续。
不过,在开发和测试过程中,我们可能需要频繁从头测试全新的连接器,这个机制就很恼人了。解决这个问题的办法有两个。

  • 创建连接器的时候使用不同的名字。这会让 Kafka Connect 把它当成一个全新的连接器对待,从头开始执行整个逻辑。
  • 在 connect-offset 主题里面添加一条记录,记录的键是连接器名字,而值则是 null。因为 Kafka 是流,所以这里面的配置也是新的配置覆盖旧的配置,null 覆盖原有配置后,连接器失去了偏移,就会从头开始读取数据了。
    具体可以参考Debezium 特性深入介绍