Debezium 是一组分布式服务,用于捕获数据库中的更改(通过读取数据库日志的方式来完成数据增删改的记录),以便您的应用程序可以看到这些更改并做出响应。Debezium 将每个数据库表中的所有行级更改记录在更改事件流中,应用程序只需读取这些流,即可按更改事件发生的顺序查看更改事件。
目前debezium有三种部署方式:
Kafka Connect 模式
Debezium 作为一个 Kafka Connect 的 Source Connector 运行,将数据库的变更事件发送到 Kafka 中。
适用场景:适用于需要将数据库变更事件持久化到 Kafka,并且需要 Kafka Connect 提供的分布式、可扩展和容错能力的场景。
主要特点:可以利用 Kafka 的可靠性和容错性,支持高吞吐量和低延迟的数据传输。
Debezium Server 模式
Debezium Server 是一个独立的应用程序,它可以将数据库的变更事件流式传输到各种消息传递基础设施,如 Amazon Kinesis、Google Cloud Pub/Sub 或 Apache Pulsar。
适用场景:适用于需要将数据库变更事件发送到非 Kafka 的消息队列或流处理系统的场景。
主要特点:提供了更多的灵活性,可以支持多种不同的消息传递基础设施。
Embedded Engine 模式
在这种模式下,Debezium 不通过 Kafka Connect 运行,而是作为一个库嵌入到自定义的 Java 应用程序中。
适用场景:适用于需要在应用程序内部直接消费数据库变更事件,而不希望通过 Kafka 进行中转的场景。
主要特点:减少了对外部系统的依赖,适合于轻量级的应用程序或微服务架构。
Debezium特点:
简单易上手
快速稳定,可以扩展,可以通过kafka构建
能够监控多种数据库 mysql pgsql等等
下面介绍基于kafka connector部署 Debeziunm:
Strimzi简化了Kafka在Kubernetes上的部署和管理:
安装Strimzi Operator
1 | # 添加Strimzi Helm仓库 |
这个是添加的最新的仓库,我需要安装的是历史版本(0.33.0)的,所以把strimzi-kafka-operator-helm-3-chart-0.33.0.tgz下载到本地安装:
1 | helm install strimzi-kafka ./strimzi-kafka-operator-helm-3-chart-0.33.0.tgz -n qifu-develop |
安装完成后会有这个pod:

ps:卸载命令:
1 | helm uninstall strimzi-kafka -n qifu-develop |
部署kafka集群:
debezium-cluster.yaml文件:
1 | apiVersion: kafka.strimzi.io/v1beta2 |
部署:
1 | kubectl apply -f debezium-cluster.yaml |
部署完成后会有3个pod:

构建包含Debezium插件的kafka Connect镜像
创建Dockerfile,将Debezium插件添加到Kafka Connect:
1 | FROM quay.io/strimzi/kafka:0.33.0-kafka-3.2.0 |
构建并推送镜像到容器仓库:
1 | docker build -t harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final . |
部署Kafka Connect
创建KafkaConnect资源(kafka-connect.yaml
):
1 | apiVersion: kafka.strimzi.io/v1beta2 |
部署:
1 | kubectl apply -f kafka-connect.yaml |
部署完成后会有以下pod

创建Debezium连接器
创建连接器配置(mysql-connector-test.yaml
):
1 | apiVersion: kafka.strimzi.io/v1beta2 |
注:快照模式默认是initial,全量同步历史数据到kafka,如果希望不同步历史数据的话可以加上以下配置:
1 | snapshot.mode: schema_only |
部署:
1 | kubectl apply -f mysql-connector-test.yaml |
验证:
可以到debezium-connect-cluster-connect容器里执行以下命令查看连接器:
1 | curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/mysql-connector-test |

或者使用以下命令查看:
1 | kubectl describe KafkaConnector mysql-connector-test -n qifu-develop |

查看Kafka主题中的变更事件:
1 | kubectl -n qifu-develop run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.33.0-kafka-3.2.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --topic test.qifu_saas_oms.tables |
登录kafka容器查看主题列表:
1 | bin/kafka-topics.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --list |
启动一个消费者,消费主题:
1 | bin/kafka-console-consumer.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --topic test.qifu_saas_oms.tables |
查看消费群组详情:
1 | bin/kafka-consumer-groups.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --list |

查看消费群组消息积压:
1 | bin/kafka-consumer-groups.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --describe --group consumer-oms |

修改连接器配置
之后需要修改mysql连接器配置的话,可以到debezium-connect-cluster-connect容器里执行以下命令修改:
1 | curl -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/mysql-connector-test/config \ |
删除连接器:
1 | curl -i -X DELETE localhost:8083/connectors/mysql-connector-test/ |
重启连接器:
1 | curl -X POST http://localhost:8083/connectors/mysql-connector-test/restart |
检查连接器状态:
1 | curl http://localhost:8083/connectors/mysql-connector-test/status |
或者修改mysql-connector-test.yaml文件之后再部署:
1 | kubectl apply -f mysql-connector-test.yaml |
删除连接器:
1 | kubectl delete -f mysql-connector-test.yaml |
遇到的问题
问题1
遇到开发环境部署mysql连接器之后同步完数据后报错’performance_schema.session_status’ doesn’t exist,然后重复一直同步的问题。到数据库里查看performance_schema库,确实没有session_status表,其他环境的数据库都有。
解决办法:
在开发环境数据库执行以下命令:
1 | mysql_upgrade -u root -p --force |

然后就会重新生成performance_schema.session_status,需要重启数据库生效。
问题2
相同配置,UAT环境没问题,上生产的时候报错:
1 | 2025-06-09 10:04:43,710 ERROR [mysql-connector-to-etm|task-0] WorkerSourceTask{id=mysql-connector-to-etm-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-mysql-connector-to-etm-0] |
生产者发数据默认限制1M,超过了限制,需要修改一下debezium连接器和kafka的限制大小:
kafka-connect.yaml:
1 | producer.max.request.size: 3145728 |

debezium-cluster.yaml:
1 | message.max.bytes: 3145728 |

修改完之后依次重启kafka,debezium连接器和mysql连接器使配置生效。
问题3
mysql连接器配置里加了include.query: true,并且mysql开启binlog_rows_query_log_events之后,会记录具体的sql语句,其他字段的中文正常显示,但是query字段的中文会乱码:

问了Debezium社区的人,他们说3.0的版本没有这个问题,于是用docker临时启动了服务来测试:
zookeeper:
1 | docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 --security-opt seccomp=unconfined quay.io/debezium/zookeeper:3.0 |
kafka:
1 | docker run -it --rm --name kafka -p 9092:9092 --security-opt seccomp=unconfined --link zookeeper:zookeeper quay.io/debezium/kafka:3.0 |
mysql:
1 | docker run -it --rm --name mysql -p 3306:3306 -v /root/my.cnf:/etc/my.cnf --security-opt seccomp=unconfined -e LANG=en_US.UTF-8 -e LC_ALL=en_US.UTF-8 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:3.0 |
connect:
1 | docker run -it --rm --name connect -p 8083:8083 --security-opt seccomp=unconfined -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:3.0 |
注册mysql连接器:
1 | curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "dev-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "10.168.2.187", "database.port": "30336", "database.user": "root", "database.password": "******", "database.server.id": "184059", "topic.prefix": "dev", "database.include.list": "dev_qifu_saas_aggregation", "table.include.list": "dev_qifu_saas_aggregation.base_warehouse_area", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory-dev", "include.schema.changes": "true", "include.query":"true", "provide.transaction.metadata": "true" } }' |
查看消费内容:

发现确实可以正常显示query字段的中文,研发说这个不影响,就没在k8s上面升级版本,在此记录下docker部署的3.0版本各组件的版本情况,以便以后k8s部署的Debezium服务如果需要升级,可直接安装对应版本:
zookeeper:3.8.4
kafka:3.9.0
Debezium:3.0.8.final
更新
由于有新的需求,研发告知中文乱码的问题需要修复,按照之前排查的思路,在k8s把对应组件容器版本修改为对应3.0的版本,发现还是乱码,我们k8s进入容器的命令一般是使用
1 | kubectl exec -it debezium-connect-cluster-connect-8d787f9c7-v25h7 -n qifu-develop -- /bin/bash |
查看编码是utf8:

1 | java -XshowSettings:properties -version | grep encoding |

之前排查编码问题也一直没看出啥问题,直到一次偶然,使用以下命令进入容器:
1 | kubectl exec -it debezium-connect-cluster-connect-8d787f9c7-v25h7 -n qifu-develop sh |
发现编码竟然不是utf8

1 | java -XshowSettings:properties -version | grep encoding |

至此,发现确实是编码问题,重新构建一个镜像,以新镜像启动容器即可
修改Dockerfile文件:
1 | FROM quay.io/strimzi/kafka:0.33.0-kafka-3.2.0 |
构建并推送镜像到容器仓库:
1 | docker build -t harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final_bug_fix . |
问题4
添加新表后,如果新表修改了数据触发同步到kafka,由于历史主题没有新表的元数据,连接器就会报错:
1 | 2025-06-24 10:42:42,079 ERROR [mysql-connector-to-etm|task-0] Encountered change event 'Event{header=EventHeaderV4{timestamp=1750728336000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=162, nextPosition=998903090, flags=0}, data=TableMapEventData{tableId=10742, database='qifu_saas_owms', table='outbound_order', columnTypes=8, 8, 8, 8, 8, 15, 15, 15, 3, -10, 8, 15, 15, 3, 1, 15, 8, 15, 15, 8, 15, 15, 15, 3, 15, 15, 15, 1, 3, 8, 8, 8, -10, 8, 8, 8, -10, 15, 15, 15, 8, 3, 8, 3, 3, 15, 3, 8, 8, 3, 3, 15, 8, 15, 8, 8, 15, 8, 8, 3, columnMetadata=0, 0, 0, 0, 0, 384, 512, 512, 0, 1556, 0, 384, 512, 0, 0, 512, 0, 512, 512, 0, 512, 512, 800, 0, 200, 800, 800, 0, 0, 0, 0, 0, 788, 0, 0, 0, 532, 900, 1020, 1020, 0, 0, 0, 0, 0, 3072, 0, 0, 0, 0, 0, 1020, 0, 128, 0, 0, 128, 0, 0, 0, columnNullability={2, 3, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 55, 56, 57, 59}, eventMetadata=null}}' at offset {transaction_id=6f54a8d9-71a4-11ef-bfb2-00163e105ad2:85818558, file=mysql-bin.000113, pos=998902025, gtids=6f54a8d9-71a4-11ef-bfb2-00163e105ad2:1-85818557, server_id=1, event=2} for table qifu_saas_owms.outbound_order whose schema isn't known to this connector. One possible cause is an incomplete database schema history topic. Take a new snapshot in this case. |
解决办法:
方法1(需要新版本的Debezium,旧版本执行这个命令会报错):
此方法相当于初始化,会再把所有数据同步一遍,数据量大的话耗时很长:
停止连接器:
1 | curl -X PUT http://127.0.0.1:8083/connectors/mysql-connector-to-etm/stop |
重置连接器偏移量:
1 | curl -X DELETE http://127.0.0.1:8083/connectors/mysql-connector-to-etm/offsets |

方法2:
1.删除历史主题或重命名历史主题(schema.history.internal.kafka.topic)
2.配置文件修改模式为恢复模式:
1 | snapshot.mode: schema_only_recovery |
3.更新连接器配置:
1 | kubectl apply -f mysql-connector-to-etm.yaml |
问题5
连接器报错:
1 | io.debezium.DebeziumException: Error reading MySQL variables: Communications link failure |
解决:
mysql连接器配置添加以下配置:
1 | connect.timeout.ms: 30000 |

更新mysql连接器配置:
1 | kubectl apply -f mysql-connector-test.yaml |
问题6
连接器报错:
1 | java.lang.IllegalStateException: The database schema history couldn't be recovered. Consider to increase the value for schema.history.internal.kafka.recovery.poll.interval.ms |
解决办法:
添加超时时间字段:
1 | schema.history.internal.kafka.recovery.poll.interval.ms: 5000 |
