合 使用OGG微服务将PG同步到kafka(全量+增量)
Tags: OGG数据迁移实时同步OGG微服务Kafka全量增量
类似可以参考:使用OGG传统模式将Oracle同步到kafka(全量+增量)
环境准备
PG环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | -- 创建专用网络 docker network create --subnet=172.72.6.0/24 pg-network -- PG docker rm -f lhrpg docker run -d --name lhrpg -h lhrpg \ -p 64320:5432 --net=pg-network --ip 172.72.6.34 \ -e POSTGRES_PASSWORD=lhr \ -e TZ=Asia/Shanghai \ postgres:14.2 psql -U postgres -h 192.168.1.35 -p 64320 create database lhrdb; \c lhrdb create table t1(id int primary key); create table t2(id int primary key); create schema ogg; -- 需要重启库 alter system set wal_level='logical'; select pg_reload_conf(); docker restart lhrpg sysbench /usr/share/sysbench/oltp_common.lua --db-driver=pgsql \ --pgsql-host=172.72.6.34 --pgsql-port=5432 \ --pgsql-user=postgres --pgsql-password=lhr --pgsql-db=lhrdb \ --table-size=100 --tables=10 --threads=10 \ --events=999999999 --time=60 prepare psql -U postgres -h 192.168.1.35 -p 64320 -d lhrdb lhrdb=# \dt List of relations Schema | Name | Type | Owner --------+----------+-------+---------- public | sbtest1 | table | postgres public | sbtest10 | table | postgres public | sbtest2 | table | postgres public | sbtest3 | table | postgres public | sbtest4 | table | postgres public | sbtest5 | table | postgres public | sbtest6 | table | postgres public | sbtest7 | table | postgres public | sbtest8 | table | postgres public | sbtest9 | table | postgres public | t1 | table | postgres public | t2 | table | postgres (12 rows) |
目标端kafka环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | docker pull lhrbest/kafka:3.2.0 docker rm -f lhrkafka docker run -itd --name lhrkafka -h lhrkafka \ --net=pg-network --ip 172.72.6.44 \ -p 9092:9092 -p 2181:2181 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/kafka:3.2.0 \ /usr/sbin/init docker exec -it lhrkafka bash -- 启动(默认已启动) /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & [root@lhrkafka /]# jps 161 QuorumPeerMain 162 Kafka 1127 Jps [root@lhrkafka /]# ps -ef|grep java root 161 1 7 14:20 ? 00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties root 162 1 30 14:20 ? 00:00:14 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties root 1167 961 0 14:20 pts/1 00:00:00 grep --color=auto java [root@lhrkafka /]# netstat -tulnp | grep java tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 161/java tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:37691 0.0.0.0:* LISTEN 161/java tcp 0 0 0.0.0.0:40831 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:38977 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 162/java |
kafka默认占用9092端口,ZK默认占用2181端口。
kafka日志:
1 | tailf /usr/local/kafka/logs/server.log |
测试一下,在服务器上创建一个topic为test,然后生产几条信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 | -- 生产者 /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test >hello >world -- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning hello word -- 查看当前服务器中的所有 topic /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 |
源端OGG for PG微服务环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | -- OGG机器 docker pull lhrbest/ogg213mapg:v1.0 docker rm -f lhrogg213mapg docker run -d --name lhrogg213mapg -h lhrogg213mapg \ --net=pg-network --ip 172.72.6.100 \ -p 9391:3389 -p 29000-29005:9000-9005 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/ogg213mapg:v1.0 \ /usr/sbin/init docker exec -it lhrogg213mapg bash -- OGGMA cat > /ogg213c/ogg_ma/odbc.ini <<"EOF" [ODBC Data Sources] PGDSN=DataDirect 13 PostgreSQL Wire Protocol [ODBC] IANAAppCodePage=106 InstallDir=/ogg213c/ogg_ma [PGDSN] Driver=/ogg213c/ogg_ma/lib/GGpsql25.so #Driver=/usr/lib64/psqlodbcw.so Description=DataDirect 13 PostgreSQL Wire Protocol Database=lhrdb HostName=172.72.6.34 PortNumber=5432 LogonID=postgres Password=lhr EOF su - pg adminclient CONNECT http://127.0.0.1:9000 deployment deploy213 as oggadmin password lhr |
访问:http://192.168.1.35:29001 ,用户名:oggadmin,密码:lhr
创建身份证明、添加trandata
目标端OGG for bigdata微服务环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | docker pull lhrbest/ogg214mabigdata:v1.0 docker rm -f lhrogg214mabigdata docker run -d --name lhrogg214mabigdata -h lhrogg214mabigdata \ --net=pg-network --ip 172.72.6.101 \ -p 9191:3389 -p 9000-9005:9000-9005 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/ogg214mabigdata:v1.0 \ /usr/sbin/init docker exec -it lhrogg214mabigdata bash -- 配置kafka参数 vi /ogg214c/ogg_deploy/etc/conf/ogg/kafka.props gg.handler.kafkahandler.schemaTopicName=LHR_OGG vi /ogg214c/ogg_deploy/etc/conf/ogg/custom_kafka_producer.properties bootstrap.servers=172.72.6.44:9092 |
访问:http://192.168.1.35:9001 ,用户名:oggadmin,密码:lhr
全量同步
注意:在此阶段,源端需要停业务,不能产生新数据。
源端创建初始化加载
1 2 3 4 5 6 | EXTRACT ext0 SETENV(PGCLIENTENCODING = "UTF8") SETENV(ODBCINI="/ogg213c/ogg_ma/odbc.ini") SOURCEDB PGDSN USERIDALIAS PG1, DOMAIN OGGMA EXTFILE ./dirdat/e0 , PURGE TABLE public.*; |
查询报告,说明数据已经传输到目标端了,如下:
进入OS查询:
1 2 3 4 5 6 7 8 | [root@lhrogg213mapg /]# cd /ogg213c/ogg_deploy/var/lib/data/dirdat [root@lhrogg213mapg dirdat]# ll total 272 -rw-r----- 1 oracle oinstall 278395 Jul 25 09:42 e0000000 [root@lhrogg213mapg dirdat]# ll -h total 272K -rw-r----- 1 oracle oinstall 272K Jul 25 09:42 e0000000 [root@lhrogg213mapg dirdat]# |
使用scp将文件传递到目标端
1 2 3 4 | scp /ogg213c/ogg_deploy/var/lib/data/dirdat/e0000000 root@172.72.6.101:/ogg214c/ogg_deploy/var/lib/data/dirdat/ cd /ogg214c/ogg_deploy/var/lib/data/dirdat/ chown oracle.oinstall -R ./dirdat/ |
目标端kafka数据全量初始化
1 2 3 4 | REPLICAT rep0 targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props end runtime map public.*, target public.*; |
运行完后,自动停止:
全量同步结果检查
1 2 3 4 5 6 7 8 9 10 | -- 查看所有历史数据 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic LHR_OGG --from-beginning -- 查看当前服务器中的所有 topic /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 -- topic详情 /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic LHR_OGG |
一张表一个主题,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | [root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 __consumer_offsets test [root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 LHR_OGG __consumer_offsets sbtest1 sbtest10 sbtest2 sbtest3 sbtest4 sbtest5 sbtest6 sbtest7 sbtest8 sbtest9 [root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sbtest1 --from-beginning | wc -l ^CProcessed a total of 100 messages |
数据已全量同步完成。
增量同步
配置复制槽
在配置PostgreSQL实时同步之前,需要先配置复制槽。
1 2 3 4 5 | su - oracle adminclient CONNECT http://127.0.0.1:9001 deployment deploy213 as oggadmin password lhr dblogin useridalias PG1 DOMAIN OGGMA REGISTER EXTRACT ext1 |
若不配置复制槽,会报错:OGG-25374
Oracle GoldenGate Capture for PostgreSQL, EXT1.prm: The replication slot 'ext1_eaa1c3d574a94c47' for group 'EXT1' does not exist in the database 'lhrdb'.