利用Debezium捕获PostgreSQL的数据变化
遇到数据同步的需求,需要将PostgreSQL的表变更同步给其他业务系统。查找资料后了解到CDC(Change Data Capture,变化数据捕获)技术,CDC的思想是发出数据库上执行的所有插入、更新、删除和模式更改的changelog。Debezium是一个开源的,为捕获数据变更(CDC)提供低延迟的数据流平台,通过Apache Kafka连接部署。
安装部署
本次测试使用Docker环境进行,使用Debezium需要三个独立的服务:ZooKeeper、Kafka 和Debezium连接器服务,版本采用Debezium日前发布的2.1.2.Final版本。
启动Zookeeper:
1 | docker run -itd --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:2.1.2.Final |
启动Kafka:
1 | docker run -itd --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:2.1.2.Final |
PostgreSQL数据库使用Debezium的example-postgres:2.1.2.Final,基于PostgreSQL 15.2。启动PostgreSQL:
1 | docker run -itd --name postgres -p 25432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=TZmLcQlSyaeqkk7N debezium/example-postgres:2.1.2.Final |
创建Debezium Connect:
1 | docker run -itd --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect:2.1.2.Final |
进入PostgreSQL,创建sync database并创建emp表:
1 | CREATE DATABASE SYNC; |
通过\d+
查看表信息:
1 | sync=# \d+ emp |
关于PostgreSQL的schema public,可以查看文档:
https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-PUBLIC
其中提到:
In the previous sections we created tables without specifying any schema names. By default such tables (and other objects) are automatically put into a schema named “public”. Every new database contains such a schema.
设置Replica identity:
1 | ALTER TABLE emp REPLICA IDENTITY FULL; |
关于Replica identity,可以查看文档:
- https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-replica-identity
- https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-update-events
Connector配置
以上环境搭建好后,需要调用Debezium提供的API创建connector进行连接器注册:
1 | 获取connect IP |
关于Debezium PostgreSQL Connector的配置可以参考文档:
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties
注册完成后会在PostgreSQL创建一个名为dbz_demo_slot的复制槽:
1 | postgres=# select * from pg_replication_slots; |
关于PostgreSQL的复制槽,可以查阅如下资料:
- https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS
- https://www.postgresql.org/docs/current/logicaldecoding.html
- https://mp.weixin.qq.com/s/kbcx-q82_X_mFl_1NVxEyA
部分Connector RESTful API用法:
1 | 查看连接器列表 |
Connector测试
完成Connector配置后,进入kafka进行Debezium捕获数据效果的测试。使用Kafka提供的kafka-console-consumer.sh查看Topic接收到的数据:
1 | [kafka@480d0be036aa ~]$ bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.5:9092 --topic sync.public.emp --from-beginning |
可以观察到此前已经产生的数据变更消息。接下来分别进行insert、update、delete操作的测试。
insert
1 | INSERT INTO EMP VALUES (7956, 'AAAAJ', 'BBBBJ', 7782, '1982-1-23', 1300, NULL, 10); |
在kafka consumer中收到消息:
1 | {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"empno"},{"type":"string","optional":true,"field":"ename"},{"type":"string","optional":true,"field":"job"},{"type":"int32","optional":true,"field":"mgr"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"hiredate"},{"type":"int32","optional":true,"field":"sal"},{"type":"int32","optional":true,"field":"comm"},{"type":"int32","optional":true,"field":"deptno"}],"optional":true,"name":"sync.public.emp.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"empno"},{"type":"string","optional":true,"field":"ename"},{"type":"string","optional":true,"field":"job"},{"type":"int32","optional":true,"field":"mgr"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"hiredate"},{"type":"int32","optional":true,"field":"sal"},{"type":"int32","optional":true,"field":"comm"},{"type":"int32","optional":true,"field":"deptno"}],"optional":true,"name":"sync.public.emp.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"sync.public.emp.Envelope","version":1},"payload":{"before":null,"after":{"empno":7956,"ename":"AAAAJ","job":"BBBBJ","mgr":7782,"hiredate":4405,"sal":1300,"comm":null,"deptno":10},"source":{"version":"2.1.2.Final","connector":"postgresql","name":"sync","ts_ms":1678009752835,"snapshot":"false","db":"sync","sequence":"[null,\"43136160\"]","schema":"public","table":"emp","txId":799,"lsn":43136160,"xmin":null},"op":"c","ts_ms":1678009753094,"transaction":null}} |
查看格式化消息的payload部分,insert操作的op为c (create),before为null,after为插入的数据:
1 | { |
update
1 | UPDATE EMP SET mgr=7786 WHERE empno=7956; |
查看格式化消息的payload部分,update操作的op为u (update),before为上条插入数据,after为更新的数据:
1 | { |
delete
1 | DELETE FROM EMP WHERE empno=7956; |
查看格式化消息的payload部分,delete操作的op为d (delete),before为上条更新后数据,after为null:
1 | { |
PostgreSQL逻辑解码插件
在连接器注册时,plugin.name
参数含义为PostgreSQL服务器上安装的PostgreSQL逻辑解码插件名称,当前版本支持的值为decoderbufs和pgoutput,默认值为decoderbufs。其中pgoutput从PostgreSQL 10以后内置可用。
在Debezium的example-postgres:2.1.2.Final中,在postgresql.conf的配置:
1 | pwd |
且在依赖库中包含:
1 | pwd |
以上配置和decoderbufs.so为Debezium编译,在标准版本中并不存在这些配置。
关于decoderbufs,Debezium提供了支持:
https://github.com/debezium/postgres-decoderbufs
其中要求PostgreSQL在9.6+,接下来选择PostgreSQL 9.6.24版本进行测试,采用docker环境postgres:9.6.24-bullseye版本。bullseye为Debian 11,选用postgres:9.6.24版本为Debian 9,在安装postgresql-server-dev-9.6时会遇到libpq的依赖问题。
Debezium构建container镜像的工程如下,可以基于此自行构建一次:
https://github.com/debezium/container-images
1 | git clone https://github.com/debezium/container-images |
执行构建:
1 | docker build -t debezium:postgres-9.6.24-bullseye . |
启动镜像:
1 | docker run -itd --name postgres_logical -p 15432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=TZmLcQlSyaeqkk7N debezium:postgres-9.6.24-bullseye |
验证decoderbufs:
1 | postgres=# select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs'); |
Debezium对各个版本的支持,可以查看release文档,并构建对应的环境进行测试即可:
https://debezium.io/releases/
参考
1、https://www.freebuf.com/articles/web/345122.html
2、https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS
3、https://www.postgresql.org/docs/current/logicaldecoding.html
4、https://segmentfault.com/a/1190000040364198
5、https://stackoverflow.com/questions/59799503/postgres-debezium-does-not-publish-the-previous-state-of-a-record
6、https://diabloneo.github.io/2021/06/27/postgres-logical-replication-and-cdc-part-1/
7、https://doc.zhangeamon.top/postgres/debezium/
8、https://debezium.io/documentation/reference/stable/connectors/postgresql.html
9、https://debezium.io/documentation/reference/stable/postgres-plugins.html