k8s部署Debezium及kafka

Debezium 是一组分布式服务,用于捕获数据库中的更改(通过读取数据库日志的方式来完成数据增删改的记录),以便您的应用程序可以看到这些更改并做出响应。Debezium 将每个数据库表中的所有行级更改记录在更改事件流中,应用程序只需读取这些流,即可按更改事件发生的顺序查看更改事件。

目前debezium有三种部署方式:

Kafka Connect 模式
Debezium 作为一个 Kafka Connect 的 Source Connector 运行,将数据库的变更事件发送到 Kafka 中。
适用场景:适用于需要将数据库变更事件持久化到 Kafka,并且需要 Kafka Connect 提供的分布式、可扩展和容错能力的场景。
主要特点:可以利用 Kafka 的可靠性和容错性,支持高吞吐量和低延迟的数据传输。
Debezium Server 模式
Debezium Server 是一个独立的应用程序,它可以将数据库的变更事件流式传输到各种消息传递基础设施,如 Amazon Kinesis、Google Cloud Pub/Sub 或 Apache Pulsar。
适用场景:适用于需要将数据库变更事件发送到非 Kafka 的消息队列或流处理系统的场景。
主要特点:提供了更多的灵活性,可以支持多种不同的消息传递基础设施。
Embedded Engine 模式
在这种模式下,Debezium 不通过 Kafka Connect 运行,而是作为一个库嵌入到自定义的 Java 应用程序中。
适用场景:适用于需要在应用程序内部直接消费数据库变更事件,而不希望通过 Kafka 进行中转的场景。
主要特点:减少了对外部系统的依赖,适合于轻量级的应用程序或微服务架构。

Debezium特点:

  • 简单易上手

  • 快速稳定,可以扩展,可以通过kafka构建

  • 能够监控多种数据库 mysql pgsql等等

下面介绍基于kafka connector部署 Debeziunm:

Strimzi简化了Kafka在Kubernetes上的部署和管理:

安装Strimzi Operator

1
2
3
4
5
# 添加Strimzi Helm仓库
helm repo add strimzi https://strimzi.io/charts/

# 安装Strimzi Operator
helm install strimzi-kafka strimzi/strimzi-kafka-operator -n kafka --create-namespace

这个是添加的最新的仓库,我需要安装的是历史版本(0.33.0)的,所以把strimzi-kafka-operator-helm-3-chart-0.33.0.tgz下载到本地安装:

1
helm install strimzi-kafka ./strimzi-kafka-operator-helm-3-chart-0.33.0.tgz -n qifu-develop

安装完成后会有这个pod:

ps:卸载命令:

1
helm uninstall strimzi-kafka -n qifu-develop

部署kafka集群:

debezium-cluster.yaml文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: debezium-cluster
namespace: qifu-develop
spec:
kafka:
version: 3.2.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
class: nfs-storage-node04
config: #开发环境配置,生产环境去掉此配置
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 20Gi
deleteClaim: false
class: nfs-storage-node04
entityOperator:
topicOperator: {}
userOperator: {}

部署:

1
kubectl apply -f debezium-cluster.yaml

部署完成后会有3个pod:

构建包含Debezium插件的kafka Connect镜像

创建Dockerfile,将Debezium插件添加到Kafka Connect:

1
2
3
4
5
FROM quay.io/strimzi/kafka:0.33.0-kafka-3.2.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
RUN curl -L https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.3.4.Final/debezium-connector-mysql-2.3.4.Final-plugin.tar.gz | tar xz -C /opt/kafka/plugins/debezium/
USER 1001

构建并推送镜像到容器仓库:

1
2
docker build -t harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final .
docker push harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final

部署Kafka Connect

创建KafkaConnect资源(kafka-connect.yaml):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
namespace: qifu-develop
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.2.0
image: harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-offsets
config.storage.topic: connect-configs
status.storage.topic: connect-status
config.storage.replication.factor: -1 #开发环境配置,生产环境去掉此配置
offset.storage.replication.factor: -1 #开发环境配置,生产环境去掉此配置
status.storage.replication.factor: -1 #开发环境配置,生产环境去掉此配置

部署:

1
kubectl apply -f kafka-connect.yaml

部署完成后会有以下pod

创建Debezium连接器

创建连接器配置(mysql-connector-test.yaml):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-connector-test
namespace: qifu-develop
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: mysql.qifu.svc.cluster.local #监控的数据库连接地址
database.port: 3306
database.user: root
database.password: ******
database.server.id: 184055 #每个连接器的ID需要唯一
# database.server.name: test
topic.prefix: test #主题前缀
#监控的数据库:
database.include.list: qifu_saas_oms,qifu_saas_owms,qifu_saas_inventory,qifu_saas_aggregation
#监控的数据表:
table.include.list: qifu_saas_oms.o_order,qifu_saas_oms.o_order_task,qifu_saas_oms.o_order_company,qifu_saas_oms.o_outbound_order,qifu_saas_oms.o_outbound_order_detail,qifu_saas_oms.o_outbound_order_extra_info,qifu_saas_oms.o_outbound_order_pack_cargo,qifu_saas_oms.o_entry_order,qifu_saas_oms.o_entry_pack_cargo,qifu_saas_oms.o_entry_pack_cargo_detail,qifu_saas_owms.entry_plan,qifu_saas_owms.outbound_order,qifu_saas_owms.outbound_order_box_info,qifu_saas_owms.outbound_order_detail,qifu_saas_owms.outbound_order_extension,qifu_saas_owms.putaway_order,qifu_saas_owms.putaway_order_detail,qifu_saas_owms.putaway_order_detail_batch,qifu_saas_inventory.item_loc_inventory_flow,qifu_saas_inventory.inventory_batch,qifu_saas_aggregation.base_warehouse,qifu_saas_aggregation.logistics_package
schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092 #kafka连接地址
schema.history.internal.kafka.topic: schema-changes.inventory-test
include.schema.changes: true
include.query: true #kafka消息中记录sql语句,需mysql开启binlog_rows_query_log_events
provide.transaction.metadata: true
#以下为配置每个库对应一个主题,不配置的话默认是一个表对应一个主题
transforms: Reroute
transforms.Reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex: (test.qifu_saas_oms|test.qifu_saas_owms|test.qifu_saas_inventory|test.qifu_saas_aggregation)\..*
transforms.Reroute.topic.replacement: $1.tables

部署:

1
kubectl apply -f mysql-connector-test.yaml

验证:

可以到debezium-connect-cluster-connect容器里执行以下命令查看连接器:

1
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/mysql-connector-test

或者使用以下命令查看:

1
kubectl describe KafkaConnector mysql-connector-test -n qifu-develop

查看Kafka主题中的变更事件:

1
kubectl -n qifu-develop run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.33.0-kafka-3.2.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --topic test.qifu_saas_oms.tables

登录kafka容器查看主题列表:

1
bin/kafka-topics.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --list

启动一个消费者,消费主题:

1
bin/kafka-console-consumer.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --topic test.qifu_saas_oms.tables

查看消费群组详情:

1
bin/kafka-consumer-groups.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --list

查看消费群组消息积压:

1
bin/kafka-consumer-groups.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --describe --group consumer-oms

修改连接器配置

之后需要修改mysql连接器配置的话,可以到debezium-connect-cluster-connect容器里执行以下命令修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
curl -X PUT  -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/mysql-connector-test/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql.qifu.svc.cluster.local",
"database.port": "3306",
"database.user": "root",
"database.password": "*****",
"database.server.id": "184055",
"database.include.list": "qifu_saas_oms,qifu_saas_owms,qifu_saas_inventory,qifu_saas_aggregation",
"table.include.list": "qifu_saas_oms.o_order,qifu_saas_oms.o_order_task,qifu_saas_oms.o_order_company,qifu_saas_oms.o_outbound_order,qifu_sas_oms.o_outbound_order_extra_info,qifu_saas_oms.o_outbound_order_pack_cargo,qifu_saas_oms.o_entry_order,qifu_saas_oms.o_entry_pack_cargo,qifu_saas_oms.o_entry_pack_cargo_detail,qifu_saas_owms.entry_plan,qifu_saas_owms.outbound_order,qifu_saas_owms.outbound_order_box_info,qifu_saas_owms.outbound_owms.putaway_order,qifu_saas_owms.putaway_order_detail,qifu_saas_owms.putaway_order_detail_batch,qifu_saas_inventory.item_loc_inventory_flow,qifu_saas_inventory.inventory_batch,qifu_saas_aggregation.base_warehouse,qifu_saas_aggregation.logistics_package",
"schema.history.internal.kafka.topic": "schema-changes.inventory-test",
"schema.history.internal.kafka.bootstrap.servers": "debezium-cluster-kafka-bootstrap:9092",
"topic.prefix": "test",
"include.schema.changes": "true",
"include.query":"true",
"provide.transaction.metadata": "true",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(test.qifu_saas_oms|test.qifu_saas_owms|test.qifu_saas_inventory|test.qifu_saas_aggregation)\\..*",
"transforms.Reroute.topic.replacement": "$1.tables"
}'

删除连接器:

1
curl -i -X DELETE localhost:8083/connectors/mysql-connector-test/

或者修改mysql-connector-test.yaml文件之后再部署:

1
kubectl apply -f mysql-connector-test.yaml

删除连接器:

1
kubectl delete -f mysql-connector-test.yaml

遇到的问题

问题1:

遇到开发环境部署mysql连接器之后同步完数据后报错’performance_schema.session_status’ doesn’t exist,然后重复一直同步的问题。到数据库里查看performance_schema库,确实没有session_status表,其他环境的数据库都有。

解决办法:

在开发环境数据库执行以下命令:

1
mysql_upgrade -u root -p --force

然后就会重新生成performance_schema.session_status,需要重启数据库生效。

问题2:

mysql连接器配置里加了include.query: true,并且mysql开启binlog_rows_query_log_events之后,会记录具体的sql语句,其他字段的中文正常显示,但是query字段的中文会乱码:

问了Debezium社区的人,他们说3.0的版本没有这个问题,于是用docker临时启动了服务来测试:

zookeeper:

1
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 --security-opt seccomp=unconfined quay.io/debezium/zookeeper:3.0

kafka:

1
docker run -it --rm --name kafka -p 9092:9092 --security-opt seccomp=unconfined --link zookeeper:zookeeper quay.io/debezium/kafka:3.0

mysql:

1
docker run -it --rm --name mysql -p 3306:3306  -v /root/my.cnf:/etc/my.cnf --security-opt seccomp=unconfined -e LANG=en_US.UTF-8 -e LC_ALL=en_US.UTF-8 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:3.0

connect:

1
docker run -it --rm --name connect -p 8083:8083 --security-opt seccomp=unconfined -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:3.0

注册mysql连接器:

1
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "dev-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "10.168.2.187", "database.port": "30336", "database.user": "root", "database.password": "******", "database.server.id": "184059", "topic.prefix": "dev", "database.include.list": "dev_qifu_saas_aggregation", "table.include.list": "dev_qifu_saas_aggregation.base_warehouse_area", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory-dev", "include.schema.changes": "true", "include.query":"true", "provide.transaction.metadata": "true" } }'

查看消费内容:

发现确实可以正常显示query字段的中文,研发说这个不影响,就没在k8s上面升级版本,在此记录下docker部署的3.0版本各组件的版本情况,以便以后k8s部署的Debezium服务如果需要升级,可直接安装对应版本:

  • zookeeper:3.8.4
  • kafka:3.9.0
  • Debezium:3.0.8.final
Thank you for your accept. mua!
-------------本文结束感谢您的阅读-------------