k8s部署Debezium及kafka

Debezium 是一组分布式服务,用于捕获数据库中的更改(通过读取数据库日志的方式来完成数据增删改的记录),以便您的应用程序可以看到这些更改并做出响应。Debezium 将每个数据库表中的所有行级更改记录在更改事件流中,应用程序只需读取这些流,即可按更改事件发生的顺序查看更改事件。

目前debezium有三种部署方式:

Kafka Connect 模式
Debezium 作为一个 Kafka Connect 的 Source Connector 运行,将数据库的变更事件发送到 Kafka 中。
适用场景:适用于需要将数据库变更事件持久化到 Kafka,并且需要 Kafka Connect 提供的分布式、可扩展和容错能力的场景。
主要特点:可以利用 Kafka 的可靠性和容错性,支持高吞吐量和低延迟的数据传输。
Debezium Server 模式
Debezium Server 是一个独立的应用程序,它可以将数据库的变更事件流式传输到各种消息传递基础设施,如 Amazon Kinesis、Google Cloud Pub/Sub 或 Apache Pulsar。
适用场景:适用于需要将数据库变更事件发送到非 Kafka 的消息队列或流处理系统的场景。
主要特点:提供了更多的灵活性,可以支持多种不同的消息传递基础设施。
Embedded Engine 模式
在这种模式下,Debezium 不通过 Kafka Connect 运行,而是作为一个库嵌入到自定义的 Java 应用程序中。
适用场景:适用于需要在应用程序内部直接消费数据库变更事件,而不希望通过 Kafka 进行中转的场景。
主要特点:减少了对外部系统的依赖,适合于轻量级的应用程序或微服务架构。

Debezium特点:

  • 简单易上手

  • 快速稳定,可以扩展,可以通过kafka构建

  • 能够监控多种数据库 mysql pgsql等等

下面介绍基于kafka connector部署 Debeziunm:

Strimzi简化了Kafka在Kubernetes上的部署和管理:

安装Strimzi Operator

1
2
3
4
5
# 添加Strimzi Helm仓库
helm repo add strimzi https://strimzi.io/charts/

# 安装Strimzi Operator
helm install strimzi-kafka strimzi/strimzi-kafka-operator -n kafka --create-namespace

这个是添加的最新的仓库,我需要安装的是历史版本(0.33.0)的,所以把strimzi-kafka-operator-helm-3-chart-0.33.0.tgz下载到本地安装:

1
helm install strimzi-kafka ./strimzi-kafka-operator-helm-3-chart-0.33.0.tgz -n qifu-develop

安装完成后会有这个pod:

ps:卸载命令:

1
helm uninstall strimzi-kafka -n qifu-develop

部署kafka集群:

debezium-cluster.yaml文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: debezium-cluster
namespace: qifu-develop
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
class: nfs-storage-node04
config: #开发环境配置,生产环境去掉此配置
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 20Gi
deleteClaim: false
class: nfs-storage-node04
entityOperator:
topicOperator: {}
userOperator: {}

部署:

1
kubectl apply -f debezium-cluster.yaml

部署完成后会有3个pod:

构建包含Debezium插件的kafka Connect镜像

创建Dockerfile,将Debezium插件添加到Kafka Connect:

1
2
3
4
5
FROM quay.io/strimzi/kafka:0.33.0-kafka-3.2.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
RUN curl -L https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.3.4.Final/debezium-connector-mysql-2.3.4.Final-plugin.tar.gz | tar xz -C /opt/kafka/plugins/debezium/
USER 1001

构建并推送镜像到容器仓库:

1
2
docker build -t harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final .
docker push harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final

部署Kafka Connect

创建KafkaConnect资源(kafka-connect.yaml):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
namespace: qifu-develop
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.2.0
image: harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-offsets
config.storage.topic: connect-configs
status.storage.topic: connect-status
config.storage.replication.factor: -1 #开发环境配置,生产环境去掉此配置
offset.storage.replication.factor: -1 #开发环境配置,生产环境去掉此配置
status.storage.replication.factor: -1 #开发环境配置,生产环境去掉此配置

部署:

1
kubectl apply -f kafka-connect.yaml

部署完成后会有以下pod

创建Debezium连接器

创建连接器配置(mysql-connector-test.yaml):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-connector-test
namespace: qifu-develop
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: mysql.qifu.svc.cluster.local #监控的数据库连接地址
database.port: 3306
database.user: root
database.password: ******
database.server.id: 184055 #每个连接器的ID需要唯一
# database.server.name: test
topic.prefix: test #主题前缀
#监控的数据库:
database.include.list: qifu_saas_oms,qifu_saas_owms,qifu_saas_inventory,qifu_saas_aggregation
#监控的数据表:
table.include.list: qifu_saas_oms.o_order,qifu_saas_oms.o_order_task,qifu_saas_oms.o_order_company,qifu_saas_oms.o_outbound_order,qifu_saas_oms.o_outbound_order_detail,qifu_saas_oms.o_outbound_order_extra_info,qifu_saas_oms.o_outbound_order_pack_cargo,qifu_saas_oms.o_entry_order,qifu_saas_oms.o_entry_pack_cargo,qifu_saas_oms.o_entry_pack_cargo_detail,qifu_saas_owms.entry_plan,qifu_saas_owms.outbound_order,qifu_saas_owms.outbound_order_box_info,qifu_saas_owms.outbound_order_detail,qifu_saas_owms.outbound_order_extension,qifu_saas_owms.putaway_order,qifu_saas_owms.putaway_order_detail,qifu_saas_owms.putaway_order_detail_batch,qifu_saas_inventory.item_loc_inventory_flow,qifu_saas_inventory.inventory_batch,qifu_saas_aggregation.base_warehouse,qifu_saas_aggregation.logistics_package
schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092 #kafka连接地址
schema.history.internal.kafka.topic: schema-changes.inventory-test
include.schema.changes: true
include.query: true #kafka消息中记录sql语句,需mysql开启binlog_rows_query_log_events
provide.transaction.metadata: true
#以下为配置每个库对应一个主题,不配置的话默认是一个表对应一个主题
transforms: Reroute
transforms.Reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex: (test.qifu_saas_oms|test.qifu_saas_owms|test.qifu_saas_inventory|test.qifu_saas_aggregation)\..*
transforms.Reroute.topic.replacement: $1.tables

注:快照模式默认是initial,全量同步历史数据到kafka,如果希望不同步历史数据的话可以加上以下配置:

1
snapshot.mode: schema_only

部署:

1
kubectl apply -f mysql-connector-test.yaml

验证:

可以到debezium-connect-cluster-connect容器里执行以下命令查看连接器:

1
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/mysql-connector-test

或者使用以下命令查看:

1
kubectl describe KafkaConnector mysql-connector-test -n qifu-develop

查看Kafka主题中的变更事件:

1
kubectl -n qifu-develop run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.33.0-kafka-3.2.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --topic test.qifu_saas_oms.tables

登录kafka容器查看主题列表:

1
bin/kafka-topics.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --list

启动一个消费者,消费主题:

1
bin/kafka-console-consumer.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --topic test.qifu_saas_oms.tables

查看消费群组详情:

1
bin/kafka-consumer-groups.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --list

查看消费群组消息积压:

1
bin/kafka-consumer-groups.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --describe --group consumer-oms

修改连接器配置

之后需要修改mysql连接器配置的话,可以到debezium-connect-cluster-connect容器里执行以下命令修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
curl -X PUT  -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/mysql-connector-test/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql.qifu.svc.cluster.local",
"database.port": "3306",
"database.user": "root",
"database.password": "*****",
"database.server.id": "184055",
"database.include.list": "qifu_saas_oms,qifu_saas_owms,qifu_saas_inventory,qifu_saas_aggregation",
"table.include.list": "qifu_saas_oms.o_order,qifu_saas_oms.o_order_task,qifu_saas_oms.o_order_company,qifu_saas_oms.o_outbound_order,qifu_sas_oms.o_outbound_order_extra_info,qifu_saas_oms.o_outbound_order_pack_cargo,qifu_saas_oms.o_entry_order,qifu_saas_oms.o_entry_pack_cargo,qifu_saas_oms.o_entry_pack_cargo_detail,qifu_saas_owms.entry_plan,qifu_saas_owms.outbound_order,qifu_saas_owms.outbound_order_box_info,qifu_saas_owms.outbound_owms.putaway_order,qifu_saas_owms.putaway_order_detail,qifu_saas_owms.putaway_order_detail_batch,qifu_saas_inventory.item_loc_inventory_flow,qifu_saas_inventory.inventory_batch,qifu_saas_aggregation.base_warehouse,qifu_saas_aggregation.logistics_package",
"schema.history.internal.kafka.topic": "schema-changes.inventory-test",
"schema.history.internal.kafka.bootstrap.servers": "debezium-cluster-kafka-bootstrap:9092",
"topic.prefix": "test",
"include.schema.changes": "true",
"include.query":"true",
"provide.transaction.metadata": "true",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(test.qifu_saas_oms|test.qifu_saas_owms|test.qifu_saas_inventory|test.qifu_saas_aggregation)\\..*",
"transforms.Reroute.topic.replacement": "$1.tables"
}'

删除连接器:

1
curl -i -X DELETE localhost:8083/connectors/mysql-connector-test/

重启连接器:

1
curl -X POST http://localhost:8083/connectors/mysql-connector-test/restart

检查连接器状态:

1
curl http://localhost:8083/connectors/mysql-connector-test/status

或者修改mysql-connector-test.yaml文件之后再部署:

1
kubectl apply -f mysql-connector-test.yaml

删除连接器:

1
kubectl delete -f mysql-connector-test.yaml

遇到的问题

问题1

遇到开发环境部署mysql连接器之后同步完数据后报错’performance_schema.session_status’ doesn’t exist,然后重复一直同步的问题。到数据库里查看performance_schema库,确实没有session_status表,其他环境的数据库都有。

解决办法:

在开发环境数据库执行以下命令:

1
mysql_upgrade -u root -p --force

然后就会重新生成performance_schema.session_status,需要重启数据库生效。

问题2

相同配置,UAT环境没问题,上生产的时候报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2025-06-09 10:04:43,710 ERROR [mysql-connector-to-etm|task-0] WorkerSourceTask{id=mysql-connector-to-etm-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-mysql-connector-to-etm-0]
org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:343)
at org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:135)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:408)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:364)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:79)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2238388 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

生产者发数据默认限制1M,超过了限制,需要修改一下debezium连接器和kafka的限制大小:

kafka-connect.yaml:

1
producer.max.request.size: 3145728

debezium-cluster.yaml:

1
2
3
4
5
message.max.bytes: 3145728
replica.fetch.max.bytes: 6291456
socket.send.buffer.bytes: 3145728
socket.receive.buffer.bytes: 3145728
socket.request.max.bytes: 3145728

修改完之后依次重启kafka,debezium连接器和mysql连接器使配置生效。

问题3

mysql连接器配置里加了include.query: true,并且mysql开启binlog_rows_query_log_events之后,会记录具体的sql语句,其他字段的中文正常显示,但是query字段的中文会乱码:

问了Debezium社区的人,他们说3.0的版本没有这个问题,于是用docker临时启动了服务来测试:

zookeeper:

1
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 --security-opt seccomp=unconfined quay.io/debezium/zookeeper:3.0

kafka:

1
docker run -it --rm --name kafka -p 9092:9092 --security-opt seccomp=unconfined --link zookeeper:zookeeper quay.io/debezium/kafka:3.0

mysql:

1
docker run -it --rm --name mysql -p 3306:3306  -v /root/my.cnf:/etc/my.cnf --security-opt seccomp=unconfined -e LANG=en_US.UTF-8 -e LC_ALL=en_US.UTF-8 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:3.0

connect:

1
docker run -it --rm --name connect -p 8083:8083 --security-opt seccomp=unconfined -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:3.0

注册mysql连接器:

1
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "dev-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "10.168.2.187", "database.port": "30336", "database.user": "root", "database.password": "******", "database.server.id": "184059", "topic.prefix": "dev", "database.include.list": "dev_qifu_saas_aggregation", "table.include.list": "dev_qifu_saas_aggregation.base_warehouse_area", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory-dev", "include.schema.changes": "true", "include.query":"true", "provide.transaction.metadata": "true" } }'

查看消费内容:

发现确实可以正常显示query字段的中文,研发说这个不影响,就没在k8s上面升级版本,在此记录下docker部署的3.0版本各组件的版本情况,以便以后k8s部署的Debezium服务如果需要升级,可直接安装对应版本:

  • zookeeper:3.8.4

  • kafka:3.9.0

  • Debezium:3.0.8.final

更新

由于有新的需求,研发告知中文乱码的问题需要修复,按照之前排查的思路,在k8s把对应组件容器版本修改为对应3.0的版本,发现还是乱码,我们k8s进入容器的命令一般是使用

1
kubectl exec -it debezium-connect-cluster-connect-8d787f9c7-v25h7 -n qifu-develop -- /bin/bash

查看编码是utf8:

1
java -XshowSettings:properties -version | grep encoding

之前排查编码问题也一直没看出啥问题,直到一次偶然,使用以下命令进入容器:

1
kubectl exec -it debezium-connect-cluster-connect-8d787f9c7-v25h7 -n qifu-develop sh

发现编码竟然不是utf8

1
java -XshowSettings:properties -version | grep encoding

至此,发现确实是编码问题,重新构建一个镜像,以新镜像启动容器即可

修改Dockerfile文件:

1
2
3
4
5
6
7
8
9
FROM quay.io/strimzi/kafka:0.33.0-kafka-3.2.0
USER root:root
ENV LANG=C.utf8 \
LC_ALL=C.utf8 \
LANGUAGE=en_US
RUN echo 'export LANG=C.utf8 LC_ALL=C.utf8' >> /etc/profile
RUN mkdir -p /opt/kafka/plugins/debezium
RUN curl -L https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.0.8.Final/debezium-connector-mysql-3.0.8.Final-plugin.tar.gz | tar xz -C /opt/kafka/plugins/debezium/
USER 1001

构建并推送镜像到容器仓库:

1
2
docker build -t harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final_bug_fix .
docker push harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final_bug_fix

问题4

添加新表后,如果新表修改了数据触发同步到kafka,由于历史主题没有新表的元数据,连接器就会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2025-06-24 10:42:42,079 ERROR [mysql-connector-to-etm|task-0] Encountered change event 'Event{header=EventHeaderV4{timestamp=1750728336000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=162, nextPosition=998903090, flags=0}, data=TableMapEventData{tableId=10742, database='qifu_saas_owms', table='outbound_order', columnTypes=8, 8, 8, 8, 8, 15, 15, 15, 3, -10, 8, 15, 15, 3, 1, 15, 8, 15, 15, 8, 15, 15, 15, 3, 15, 15, 15, 1, 3, 8, 8, 8, -10, 8, 8, 8, -10, 15, 15, 15, 8, 3, 8, 3, 3, 15, 3, 8, 8, 3, 3, 15, 8, 15, 8, 8, 15, 8, 8, 3, columnMetadata=0, 0, 0, 0, 0, 384, 512, 512, 0, 1556, 0, 384, 512, 0, 0, 512, 0, 512, 512, 0, 512, 512, 800, 0, 200, 800, 800, 0, 0, 0, 0, 0, 788, 0, 0, 0, 532, 900, 1020, 1020, 0, 0, 0, 0, 0, 3072, 0, 0, 0, 0, 0, 1020, 0, 128, 0, 0, 128, 0, 0, 0, columnNullability={2, 3, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 55, 56, 57, 59}, eventMetadata=null}}' at offset {transaction_id=6f54a8d9-71a4-11ef-bfb2-00163e105ad2:85818558, file=mysql-bin.000113, pos=998902025, gtids=6f54a8d9-71a4-11ef-bfb2-00163e105ad2:1-85818557, server_id=1, event=2} for table qifu_saas_owms.outbound_order whose schema isn't known to this connector. One possible cause is an incomplete database schema history topic. Take a new snapshot in this case.
Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=998902909 --stop-position=998903090 --verbose mysql-bin.000113 (io.debezium.connector.binlog.BinlogStreamingChangeEventSource) [blc-mysql.qifu.svc.cluster.local:3306]
2025-06-24 10:42:42,081 ERROR [mysql-connector-to-etm|task-0] Error during binlog processing. Last offset stored = {transaction_id=6f54a8d9-71a4-11ef-bfb2-00163e105ad2:85818558, file=mysql-bin.000113, pos=998902025, gtids=6f54a8d9-71a4-11ef-bfb2-00163e105ad2:1-85818557, server_id=1, event=2}, binlog reader near position = mysql-bin.000113/998902909 (io.debezium.connector.binlog.BinlogStreamingChangeEventSource) [blc-mysql.qifu.svc.cluster.local:3306]
2025-06-24 10:42:42,082 ERROR [mysql-connector-to-etm|task-0] Producer failure (io.debezium.pipeline.ErrorHandler) [blc-mysql.qifu.svc.cluster.local:3306]
io.debezium.DebeziumException: Error processing binlog event
at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.handleEvent(BinlogStreamingChangeEventSource.java:591)
at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.lambda$execute$17(BinlogStreamingChangeEventSource.java:209)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1281)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1103)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:657)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:959)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.debezium.DebeziumException: Encountered change event for table qifu_saas_owms.outbound_ordeOne possible cause
r whose schema isn't known to this connector
at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.informAboutUnknownTableIfRequired(BinlogStreamingChangeEventSource.java:996)
at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.informAboutUnknownTableIfRequired(BinlogStreamingChangeEventSource.java:1048)
at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.handleUpdateTableMetadata(BinlogStreamingChangeEventSource.java:797)
at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.lambda$execute$4(BinlogStreamingChangeEventSource.java:178)
at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.handleEvent(BinlogStreamingChangeEventSource.java:571)
... 6 more

解决办法:

方法1(需要新版本的Debezium,旧版本执行这个命令会报错):

此方法相当于初始化,会再把所有数据同步一遍,数据量大的话耗时很长:

停止连接器:

1
curl -X PUT http://127.0.0.1:8083/connectors/mysql-connector-to-etm/stop

重置连接器偏移量:

1
curl -X DELETE http://127.0.0.1:8083/connectors/mysql-connector-to-etm/offsets

方法2:

1.删除历史主题或重命名历史主题(schema.history.internal.kafka.topic)

2.配置文件修改模式为恢复模式:

1
snapshot.mode: schema_only_recovery

3.更新连接器配置:

1
kubectl apply -f mysql-connector-to-etm.yaml

问题5

连接器报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
io.debezium.DebeziumException: Error reading MySQL variables: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at io.debezium.connector.binlog.jdbc.BinlogConnectorConnection.querySystemVariables(BinlogConnectorConnection.java:514)
at io.debezium.connector.binlog.jdbc.BinlogConnectorConnection.readSystemVariables(BinlogConnectorConnection.java:496)
at io.debezium.connector.binlog.jdbc.BinlogConnectorConnection.isTableIdCaseSensitive(BinlogConnectorConnection.java:332)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:98)
at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:403)
at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:313)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:249)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:165)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:55)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:837)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:420)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:238)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:180)
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:246)
at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:132)
at io.debezium.jdbc.JdbcConnection.establishConnection(JdbcConnection.java:920)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:904)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:898)
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:553)
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:501)
at io.debezium.connector.binlog.jdbc.BinlogConnectorConnection.querySystemVariables(BinlogConnectorConnection.java:502)
... 14 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:52)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:95)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:140)
at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:156)
at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:79)
at com.mysql.cj.NativeSession.connect(NativeSession.java:142)
at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:961)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:825)
... 25 more
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:549)
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:597)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
at java.base/java.net.Socket.connect(Socket.java:633)
at com.mysql.cj.protocol.StandardSocketFactory.connect(StandardSocketFactory.java:144)
at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:53)
... 28 more

解决:

mysql连接器配置添加以下配置:

1
2
3
4
5
6
connect.timeout.ms: 30000
connect.retry.interval.ms: 5000
connect.max.attempts: 10
database.ssl.mode: DISABLED
database.connection.pool.size: 5
database.connection.pool.min.idle: 2

更新mysql连接器配置:

1
kubectl apply -f mysql-connector-test.yaml

问题6

连接器报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java.lang.IllegalStateException: The database schema history couldn't be recovered. Consider to increase the value for schema.history.internal.kafka.recovery.poll.interval.ms
at io.debezium.storage.kafka.history.KafkaSchemaHistory.recoverRecords(KafkaSchemaHistory.java:312)
at io.debezium.relational.history.AbstractSchemaHistory.recover(AbstractSchemaHistory.java:100)
at io.debezium.relational.history.SchemaHistory.recover(SchemaHistory.java:192)
at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:72)
at io.debezium.schema.HistorizedDatabaseSchema.recover(HistorizedDatabaseSchema.java:40)
at io.debezium.connector.common.BaseSourceTask.validateAndLoadSchemaHistory(BaseSourceTask.java:151)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:134)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:253)
at org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:226)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)

解决办法:

添加超时时间字段:

1
schema.history.internal.kafka.recovery.poll.interval.ms: 5000
Thank you for your accept. mua!
-------------本文结束感谢您的阅读-------------