本文主要包括:
- Debezium简介
- 使用docker启动Debezium Connect
- 测试mysql-connector
Debezium简介
使用docker启动Debezium Connect
# 拉取镜像
docker pull debezium/connect:2.4
# 启动docker
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses -e BOOTSTRAP_SERVERS=golden-02:9092 debezium/connect:2.4
测试mysql-connector
执行以下命令
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "golden-02", "database.port": "3306", "database.user": "root", "database.password": "Gjc123!@#", "database.server.id": "184054", "database.server.name": "dbserver1","database.include.list":"test","table.include.list":"gjc_test_cdc","topic.prefix": "dbz_test", "database.history.kafka.bootstrap.servers": "golden-02:9092"} }'
报如下错误:
2023-11-26 08:43:40,144 ERROR || The 'schema.history.internal.kafka.topic' value is invalid: A value is required [io.debezium.storage.kafka.history.KafkaSchemaHistory]
2023-11-26 08:43:40,144 ERROR || The 'schema.history.internal.kafka.bootstrap.servers' value is invalid: A value is required [io.debezium.storage.kafka.history.KafkaSchemaHistory]
2023-11-26 08:43:40,144 ERROR || WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of KafkaSchemaHistory; check the logs for details
at io.debezium.storage.kafka.history.KafkaSchemaHistory.configure(KafkaSchemaHistory.java:208)
at io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig.getSchemaHistory(HistorizedRelationalDatabaseConnectorConfig.java:137)
at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:49)
at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:82)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:98)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:142)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2023-11-26 08:43:40,144 INFO || Stopping down connector [io.debezium.connector.common.BaseSourceTask]
2023-11-26 08:43:40,147 INFO || Connection gracefully closed [io.debezium.jdbc.JdbcConnection]
2023-11-26 08:43:40,147 INFO || [Producer clientId=connector-producer-inventory-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer]
2023-11-26 08:43:40,149 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2023-11-26 08:43:40,149 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
2023-11-26 08:43:40,149 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
2023-11-26 08:43:40,149 INFO || App info kafka.producer for connector-producer-inventory-connector-0 unregistered [org.apache.kafka.common.utils.AppInfoParser]
这里报错很明显,是缺少schema.history.internal.kafka.topic
和schema.history.internal.kafka.bootstrap.servers
,修改代码,再次执行:
问题是,虽然是报错了,但是,查询connector是有这个connecor的:
# 查看集群中connector列表
curl -i http://localhost:8083/connectors
# 删除某个connector:
curl -X DELETE http://localhost:8083/connectors/inventory-connector
问题一:table.include.list
的格式是databaseName.tableName
我这里一开始没有按照这个格式,直接制定了tableName,即:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "golden-02",
"database.port": "3306",
"database.user": "root",
"database.password": "Gjc123!@#",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "test",
"table.include.list": "gjc_test_cdc",
"topic.prefix": "dbz_test",
"schema.history.internal.kafka.topic": "dbz_schema_history",
"schema.history.internal.kafka.bootstrap.servers": "golden-02:9092"
}
}
这时候,kafka的topic只有:
bin/kafka-topics.sh --bootstrap-server golden-02:9092 --list
__consumer_offsets
dbz_schema_history
dbz_test
my_connect_configs
my_connect_offsets
my_connect_statuses
可以看到,并没有我们要采集表的topic,让人不解的是,这里直接删除这个connector,然后修改成正确的格式以后,表的topic还是没有正常创建:
# 删除inventory-connector
curl -X DELETE http://localhost:8083/connectors/inventory-connector
# 重新创建
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "golden-02", "database.port": "3306", "database.user": "root", "database.password": "Gjc123!@#", "database.server.id": "184054", "database.server.name": "dbserver1","database.include.list":"test","table.include.list":"test.gjc_test_cdc","topic.prefix": "dbz_test","schema.history.internal.kafka.topic":"dbz_schema_history","schema.history.internal.kafka.bootstrap.servers":"golden-02:9092"} }'
这是因为然连接器的进程删除了,但是它所对应的 Kafka 偏移(Offset)仍然保存在 connect-offsets 主题中。当我们新建一个连接器,却使用了原来的名字,则它会从这个主题中找到原来的偏移并接续读取数据。
这其实也是 Kafka Connect 实现高可用的机制——因为 Kafka 主题本身是高可用的,所以即便一个连接器失效,我们可以立即以同样名字、同样配置起一个新的连接器,自动接续。
不过,在开发和测试过程中,我们可能需要频繁从头测试全新的连接器,这个机制就很恼人了。解决这个问题的办法有两个。
- 创建连接器的时候使用不同的名字。这会让 Kafka Connect 把它当成一个全新的连接器对待,从头开始执行整个逻辑。
- 在 connect-offset 主题里面添加一条记录,记录的键是连接器名字,而值则是 null。因为 Kafka 是流,所以这里面的配置也是新的配置覆盖旧的配置,null 覆盖原有配置后,连接器失去了偏移,就会从头开始读取数据了。
具体可以参考Debezium 特性深入介绍