PG数据同步到kafka

0    106    1

👉 本文共约2370个字,系统预计阅读时间或需9分钟。

简介

目前CDC(Change Data Capture)工具还是比较多的 ,现在有部分增量数据同步工具是基于触发器实现的,把数据库的变化记录到某个系统表中,然后在客户端建立缓冲,并定期将变化push到接收端,接收端和发送端建立通道,并定期pull新的数据变化,并在目标数据库回放 。

但因为使用了触发器会影响生产环境的性能,这点可能无法接受 ,其他的增量数据同步工具中占比比较大的一类,是基于kafka来实现的。例如RedHat开源的 Debezium。通过抽取数据库日志来获取变更,实现增量数据同步。

其它工具也可以将PG同步到kafka,搜索了一下,列举如下:

下面是一个PostgreSQL数据同步至kafka 的例子。

环境申请

安装kafka环境

安装java

本人提供Oracle、MySQL、PG等数据库的培训和考证业务,私聊QQ646634621或微信db_bao,谢谢!

安装kafka

从kafka 2.8.0开始,自带zookeeper,无需额外安装ZooKeeper。

下载:https://kafka.apache.org/downloads.html , 大约100MB

查看java进程:

kafka默认占用9092端口,ZK默认占用2181端口。

kafka日志:

测试一下,在服务器上创建一个topic为test,然后生产几条信息:

数据库端操作

postgres 逻辑解码, 可以用jsonc或wal2json。

程序 jsoncdc(https://pgxn.org/dist/jsoncdc/)依赖于rust, 需要先安装 rust。

这里使用wal2json插件。

数据库安装wal2json

github:https://github.com/eulerto/wal2json

wal2json is an output plugin for logical decoding. It means that the plugin have access to tuples produced by INSERT and UPDATE. Also, UPDATE/DELETE old row versions can be accessed depending on the configured replica identity. Changes can be consumed using the streaming protocol (logical replication slots) or by a special SQL API.

wal2json可以提供基于PG库级别的DML日志挖掘工作。

postgresql.conf里添加shared_preload_libraries = 'wal2json.so’并修改wal_level = logical,之后重启数据库。

如果是rpm安装的pg,最好使用rpm安装wal2json。如果是源码编译安装的pg,最好源码安装wal2json。

In Red Hat/CentOS:

In Debian/Ubuntu:

编译安装:

数据库配置pg_hba.conf

让客户端可以无需密码就能登录到PG数据库,或者配置客户端密码文件都可以。

创建复制槽

我这里为了方便使用本地postgres用户。

创建测试表

测试数据是否可以抓取成json格式:

测试:

安装kafka

在PG数据库端安装kafka命令,也可以使用kcat,参考:https://www.xmmup.com/kafkaminglingxinggongju-kcathuokafkacat.html

但是,我的环境里,使用kcat执行,生产者不能接收到数据,所以,这里直接使用kafka的命令行:

将kafka的地址和主机名解析加入/etc/hosts文件

否则会报类似错误:Couldn't resolve server lhrkafka:9092 from bootstrap.servers as DNS resolution failed for lhrkafka

测试同步

在PG环境,开启一个终端,生产数据到kafka服务端:

PG查询并做DML操作数据:

在kafka环境,消费testdb_topic:

结果如下,可见数据已经同步到kafka了:

增量同步

1、停止/usr/pgsql-13/bin/pg_recvlogical -h 172.17.0.4 -p 5432 -U postgres -d postgres -S slot1 --start -f - | /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.17.0.3:9092 --topic testdb_topic

2、PG数据库做DML操作

3、开启/usr/pgsql-13/bin/pg_recvlogical -h 172.17.0.4 -p 5432 -U postgres -d postgres -S slot1 --start -f - | /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.17.0.3:9092 --topic testdb_topic

4、消费端查询数据/usr/local/kafka/bin/kafka-console-consumer.sh --topic testdb_topic --bootstrap-server 127.0.0.1:9092 --from-beginning

可以看到,最新的数据也同步过来了。

参考

https://pgfans.cn/a/1766

    头像

    小麦苗

    学习或考证,均可联系麦老师,请加微信db_bao或QQ646634621

    您可能还喜欢...

    发表评论

    您的电子邮箱地址不会被公开。

    16 − 7 =

     

    嘿,我是小麦,需要帮助随时找我哦
    • 18509239930
    • 个人微信

    • 麦老师QQ聊天
    • 个人邮箱
    • 点击加入QQ群
    • 个人微店

    • 回到顶部
    返回顶部