利用Debezium捕获PostgreSQL的数据变化

遇到数据同步的需求,需要将PostgreSQL的表变更同步给其他业务系统。查找资料后了解到CDC(Change Data Capture,变化数据捕获)技术,CDC的思想是发出数据库上执行的所有插入、更新、删除和模式更改的changelog。Debezium是一个开源的,为捕获数据变更(CDC)提供低延迟的数据流平台,通过Apache Kafka连接部署。

安装部署

本次测试使用Docker环境进行,使用Debezium需要三个独立的服务:ZooKeeper、Kafka 和Debezium连接器服务,版本采用Debezium日前发布的2.1.2.Final版本。

启动Zookeeper:

1
2
docker run -itd --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:2.1.2.Final
docker logs zookeeper

启动Kafka:

1
docker run -itd --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:2.1.2.Final

PostgreSQL数据库使用Debezium的example-postgres:2.1.2.Final,基于PostgreSQL 15.2。启动PostgreSQL:

1
docker run -itd --name postgres -p 25432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=TZmLcQlSyaeqkk7N debezium/example-postgres:2.1.2.Final

创建Debezium Connect:

1
docker run -itd --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect:2.1.2.Final

进入PostgreSQL,创建sync database并创建emp表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CREATE DATABASE SYNC;

\c sync;

CREATE TABLE EMP
(EMPNO integer NOT NULL,
ENAME VARCHAR(10),
JOB VARCHAR(9),
MGR integer,
HIREDATE DATE,
SAL integer,
COMM integer,
DEPTNO integer);

INSERT INTO EMP values (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800, NULL, 20);
INSERT INTO EMP values (7499, 'ALLEN', 'SALESMAN', 7698, '1981-2-20', 1600, 300, 30);
INSERT INTO EMP VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-2-22', 1250, 500, 30);
INSERT INTO EMP VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-4-2', 2975, NULL, 20);
INSERT INTO EMP VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-9-28', 1250, 1400, 30);
INSERT INTO EMP VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-5-1', 2850, NULL, 30);
INSERT INTO EMP VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-6-9', 2450, NULL, 10);
INSERT INTO EMP VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1982-12-9', 3000, NULL, 20);
INSERT INTO EMP VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000, NULL, 10);
INSERT INTO EMP VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-9-8', 1500, 0, 30);
INSERT INTO EMP VALUES (7876, 'ADAMS', 'CLERK', 7788, '1983-1-12', 1100, NULL, 20);
INSERT INTO EMP VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-3', 950, NULL, 30);
INSERT INTO EMP VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-3', 3000, NULL, 20);
INSERT INTO EMP VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-1-23', 1300, NULL, 10);

通过\d+查看表信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
sync=# \d+ emp
Table "public.emp"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
----------+-----------------------+-----------+----------+---------+----------+-------------+--------------+-------------
empno | integer | | not null | | plain | | |
ename | character varying(10) | | | | extended | | |
job | character varying(9) | | | | extended | | |
mgr | integer | | | | plain | | |
hiredate | date | | | | plain | | |
sal | integer | | | | plain | | |
comm | integer | | | | plain | | |
deptno | integer | | | | plain | | |
Access method: heap

关于PostgreSQL的schema public,可以查看文档:
https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-PUBLIC

其中提到:

In the previous sections we created tables without specifying any schema names. By default such tables (and other objects) are automatically put into a schema named “public”. Every new database contains such a schema.

设置Replica identity:

1
ALTER TABLE emp REPLICA IDENTITY FULL;

关于Replica identity,可以查看文档:

Connector配置

以上环境搭建好后,需要调用Debezium提供的API创建connector进行连接器注册:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 获取connect IP
$ docker inspect --format='{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' connect
172.17.0.7

$ curl -XPOST "http://172.17.0.7:8083/connectors/" -H 'Content-Type: application/json' -d '
{
"name": "pg_sync_demo_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "172.17.0.6",
"database.port": "5432",
"database.user": "postgres",
"database.password": "TZmLcQlSyaeqkk7N",
"database.dbname" : "sync",
"database.server.name": "pg_sync_demo",
"slot.name": "dbz_demo_slot",
"topic.prefix": "sync",
"table.include.list": "public.emp",
"publication.name": "dbz_demo",
"publication.autocreate.mode": "disabled",
"plugin.name": "decoderbufs",
"snapshot.mode": "always"
}
}
'

{"name":"pg_sync_demo_connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"172.17.0.6","database.port":"5432","database.user":"postgres","database.password":"TZmLcQlSyaeqkk7N","database.dbname":"sync","database.server.name":"pg_sync_demo","slot.name":"dbz_demo_slot","topic.prefix":"sync","table.include.list":"public.emp","publication.name":"dbz_demo","publication.autocreate.mode":"disabled","plugin.name":"decoderbufs","snapshot.mode":"always","name":"pg_sync_demo_connector"},"tasks":[],"type":"source"}

关于Debezium PostgreSQL Connector的配置可以参考文档:
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties

注册完成后会在PostgreSQL创建一个名为dbz_demo_slot的复制槽:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
postgres=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+--------------
slot_name | dbz_demo_slot
plugin | decoderbufs
slot_type | logical
datoid | 25682
database | sync
temporary | f
active | t
active_pid | 2804
xmin |
catalog_xmin | 798
restart_lsn | 0/2923320
confirmed_flush_lsn | 0/2923358
wal_status | reserved
safe_wal_size |
two_phase | f

关于PostgreSQL的复制槽,可以查阅如下资料:

部分Connector RESTful API用法:

1
2
3
4
5
6
# 查看连接器列表
$ curl -H 'Content-Type: application/json' "http://172.17.0.7:8083/connectors"
["pg_sync_demo_connector"]

# 删除指定连接器
$ curl -i -X DELETE "http://172.17.0.7:8083/connectors/pg_sync_demo_connector"

Connector测试

完成Connector配置后,进入kafka进行Debezium捕获数据效果的测试。使用Kafka提供的kafka-console-consumer.sh查看Topic接收到的数据:

1
[kafka@480d0be036aa ~]$ bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.5:9092 --topic sync.public.emp --from-beginning

可以观察到此前已经产生的数据变更消息。接下来分别进行insert、update、delete操作的测试。

insert

1
INSERT INTO EMP VALUES (7956, 'AAAAJ', 'BBBBJ',     7782, '1982-1-23',   1300, NULL, 10);

在kafka consumer中收到消息:

1
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"empno"},{"type":"string","optional":true,"field":"ename"},{"type":"string","optional":true,"field":"job"},{"type":"int32","optional":true,"field":"mgr"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"hiredate"},{"type":"int32","optional":true,"field":"sal"},{"type":"int32","optional":true,"field":"comm"},{"type":"int32","optional":true,"field":"deptno"}],"optional":true,"name":"sync.public.emp.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"empno"},{"type":"string","optional":true,"field":"ename"},{"type":"string","optional":true,"field":"job"},{"type":"int32","optional":true,"field":"mgr"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"hiredate"},{"type":"int32","optional":true,"field":"sal"},{"type":"int32","optional":true,"field":"comm"},{"type":"int32","optional":true,"field":"deptno"}],"optional":true,"name":"sync.public.emp.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"sync.public.emp.Envelope","version":1},"payload":{"before":null,"after":{"empno":7956,"ename":"AAAAJ","job":"BBBBJ","mgr":7782,"hiredate":4405,"sal":1300,"comm":null,"deptno":10},"source":{"version":"2.1.2.Final","connector":"postgresql","name":"sync","ts_ms":1678009752835,"snapshot":"false","db":"sync","sequence":"[null,\"43136160\"]","schema":"public","table":"emp","txId":799,"lsn":43136160,"xmin":null},"op":"c","ts_ms":1678009753094,"transaction":null}}

查看格式化消息的payload部分,insert操作的op为c (create),before为null,after为插入的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"schema": {...},
"payload": {
"before": null,
"after": {
"empno": 7956,
"ename": "AAAAJ",
"job": "BBBBJ",
"mgr": 7782,
"hiredate": 4405,
"sal": 1300,
"comm": null,
"deptno": 10
},
"source": {
"version": "2.1.2.Final",
"connector": "postgresql",
"name": "sync",
"ts_ms": 1678009752835,
"snapshot": "false",
"db": "sync",
"sequence": "[null,\"43136160\"]",
"schema": "public",
"table": "emp",
"txId": 799,
"lsn": 43136160,
"xmin": null
},
"op": "c",
"ts_ms": 1678009753094,
"transaction": null
}
}

update

1
UPDATE EMP SET mgr=7786 WHERE empno=7956;

查看格式化消息的payload部分,update操作的op为u (update),before为上条插入数据,after为更新的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
"schema": {...},
"payload": {
"before": {
"empno": 7956,
"ename": "AAAAJ",
"job": "BBBBJ",
"mgr": 7782,
"hiredate": 4405,
"sal": 1300,
"comm": null,
"deptno": 10
},
"after": {
"empno": 7956,
"ename": "AAAAJ",
"job": "BBBBJ",
"mgr": 7786,
"hiredate": 4405,
"sal": 1300,
"comm": null,
"deptno": 10
},
"source": {
"version": "2.1.2.Final",
"connector": "postgresql",
"name": "sync",
"ts_ms": 1678011551501,
"snapshot": "false",
"db": "sync",
"sequence": "[\"43146016\",\"43146072\"]",
"schema": "public",
"table": "emp",
"txId": 806,
"lsn": 43146072,
"xmin": null
},
"op": "u",
"ts_ms": 1678011551652,
"transaction": null
}
}

delete

1
DELETE FROM EMP WHERE empno=7956;

查看格式化消息的payload部分,delete操作的op为d (delete),before为上条更新后数据,after为null:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"schema": {...},
"payload": {
"before": {
"empno": 7956,
"ename": "AAAAJ",
"job": "BBBBJ",
"mgr": 7786,
"hiredate": 4405,
"sal": 1300,
"comm": null,
"deptno": 10
},
"after": null,
"source": {
"version": "2.1.2.Final",
"connector": "postgresql",
"name": "sync",
"ts_ms": 1678012060276,
"snapshot": "false",
"db": "sync",
"sequence": "[\"43148072\",\"43148304\"]",
"schema": "public",
"table": "emp",
"txId": 807,
"lsn": 43148304,
"xmin": null
},
"op": "d",
"ts_ms": 1678012060296,
"transaction": null
}
}

PostgreSQL逻辑解码插件

在连接器注册时,plugin.name参数含义为PostgreSQL服务器上安装的PostgreSQL逻辑解码插件名称,当前版本支持的值为decoderbufs和pgoutput,默认值为decoderbufs。其中pgoutput从PostgreSQL 10以后内置可用。

在Debezium的example-postgres:2.1.2.Final中,在postgresql.conf的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ pwd
/var/lib/postgresql/data

$ cat postgresql.conf
# MODULES
shared_preload_libraries = 'decoderbufs'

# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)

且在依赖库中包含:

1
2
3
4
$ pwd
/usr/lib/postgresql/15/lib
$ ls -lrt decoderbufs.so
-rwxr-xr-x 1 root root 158064 Mar 4 01:34 decoderbufs.so

以上配置和decoderbufs.so为Debezium编译,在标准版本中并不存在这些配置。

关于decoderbufs,Debezium提供了支持:
https://github.com/debezium/postgres-decoderbufs

其中要求PostgreSQL在9.6+,接下来选择PostgreSQL 9.6.24版本进行测试,采用docker环境postgres:9.6.24-bullseye版本。bullseye为Debian 11,选用postgres:9.6.24版本为Debian 9,在安装postgresql-server-dev-9.6时会遇到libpq的依赖问题。

Debezium构建container镜像的工程如下,可以基于此自行构建一次:
https://github.com/debezium/container-images

1
2
3
4
5
6
7
8
9
$ git clone https://github.com/debezium/container-images
$ cd container-images/postgres/9.6

# 编辑Dockerfile文件
# 分别在两段构建的第一个apt-get前添加如下一行将镜像站源为中科大
RUN sed -i "s@http://\(deb\|security\).debian.org@http://mirrors.ustc.edu.cn@g" /etc/apt/sources.list

将deb http://ftp.debian.org/debian testing main contrib 更换为
deb http://mirrors.ustc.edu.cn/debian testing main contrib

执行构建:

1
$ docker build -t debezium:postgres-9.6.24-bullseye .

启动镜像:

1
$ docker run -itd --name postgres_logical -p 15432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=TZmLcQlSyaeqkk7N debezium:postgres-9.6.24-bullseye

验证decoderbufs:

1
2
3
4
5
postgres=# select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
INFO: Exiting startup callback
slot_name | xlog_position
------------------+---------------
decoderbufs_demo | 0/1682FE0

Debezium对各个版本的支持,可以查看release文档,并构建对应的环境进行测试即可:
https://debezium.io/releases/

参考

1、https://www.freebuf.com/articles/web/345122.html
2、https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS
3、https://www.postgresql.org/docs/current/logicaldecoding.html
4、https://segmentfault.com/a/1190000040364198
5、https://stackoverflow.com/questions/59799503/postgres-debezium-does-not-publish-the-previous-state-of-a-record
6、https://diabloneo.github.io/2021/06/27/postgres-logical-replication-and-cdc-part-1/
7、https://doc.zhangeamon.top/postgres/debezium/
8、https://debezium.io/documentation/reference/stable/connectors/postgresql.html
9、https://debezium.io/documentation/reference/stable/postgres-plugins.html

0%