PG逻辑复制插件之pglogical使用说明
简介
参考:
https://gitee.com/mirrors/pglogical
https://github.com/2ndQuadrant/pglogical
https://www.xmmup.com/pgluojifuzhichajianzhipglogicalguanfangshuoming.html
Logical Replication extension for PostgreSQL 14、13, 12, 11, 10, 9.6, 9.5, 9.4 (Postgres), providing much faster replication than Slony, Bucardo or Londiste, as well as cross-version upgrades.
pglogical 是 PostgreSQL 的拓展模块, 为 PostgreSQL 数据库提供了逻辑流复制发布和订阅的功能。pglogical 重用了 BDR 项目中的一部分相关技术。pglogical 是一个完全作为PostgreSQL 扩展实现的逻辑复制系统。完全集成,它不需要触发器或外部程序。这种物理复制的替代方法是使用发布/订阅模型复制数据以进行选择性复制的一种高效方法。支持 PG14、13、12、11、10、9.6、9.5、9.4 ,提供比 Slony、Bucardo 或 Londiste 更快的复制速度,以及跨版本升级。
我们使用的下列术语来描述节点和数据流之间的关系,重用了一些早期的Slony技术中的术语:
- 节点 - PostgreSQL数据库实例
- 发布者和订阅者 - 节点的角色名称
- 复制集 - 关系表的集合
pglogical是新技术组件,使用了最新的PostgreSQL 数据库中的一些核心功能,所以存在一些数据库版本限制:
- 数据源发布和订阅节点需要运行 PostgreSQL 9.4 +
- 复制源过滤和冲突检测需要 PostgreSQL 9.5 +
支持的使用场景:
- 主版本数据库之间的升级(存在上述的版本限制)
- 完整的数据库复制
- 利用复制集,选择性的筛选的关系表
- 可从多个上游服务器,做数据的聚集和合并
架构上的细节︰
- pglogical 工作在每个数据库层面上,而不是像物理流复制一样工作在整个数据库集簇实例级别
- 一个发布程序提供给多个订阅者不会引起额外的磁盘写开销
- 一个订阅服务器可以从几个起源的更改合并和检测与自动和可配置冲突决议 (一些,但并不是所有方面所需的多主机)的更改之间的冲突。
- 级联复制是在变更集转发的过程中实现的。
必要条件
- 要使用pglogical,提供发布和订阅服务器必须运行PostgreSQL 9.4或更高版本。
- pglogical扩展必须同时安装在提供发布和订阅服务器上。您必须同时创建扩展“CREATE EXTENSION pglogical;”。
- 提供发布和订阅服务器上的表必须具有相同的名称并位于相同的schema中。将来的修订版可能会添加映射功能。
- 提供发布和订阅服务器上的表必须具有相同的列,每列中的数据类型相同。订阅服务器上的CHECK约束、NOT NULL约束等必须与提供发布相同或较弱(更宽松)。
- 表必须具有相同的主键。不建议添加主键以外的其他唯一约束。
安装插件
安装包安装
yum安装
1、安装yum源:
- PostgreSQL 9.4:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.4/rpm | bash
- PostgreSQL 9.5:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.5/rpm | bash
- PostgreSQL 9.6:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/9.6/rpm | bash
- PostgreSQL 10:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/10/rpm | bash
- PostgreSQL 11:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/11/rpm | bash
- PostgreSQL 12:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/12/rpm | bash
- PostgreSQL 13:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/13/rpm | bash
- PostgreSQL 14:
curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/14/rpm | bash
2、安装插件
- PostgreSQL 9.4:
yum install postgresql94-pglogical
- PostgreSQL 9.5:
yum install postgresql95-pglogical
- PostgreSQL 9.6:
yum install postgresql96-pglogical
- PostgreSQL 10:
yum install postgresql10-pglogical
- PostgreSQL 11:
yum install postgresql11-pglogical
- PostgreSQL 12:
yum install postgresql12-pglogical
- PostgreSQL 13:
yum install postgresql13-pglogical
- PostgreSQL 14:
yum install postgresql14-pglogical
APT安装
1、安装源:
1 | curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash |
2、安装插件
- PostgreSQL 9.4:
sudo apt-get install postgresql-9.4-pglogical
- PostgreSQL 9.5:
sudo apt-get install postgresql-9.5-pglogical
- PostgreSQL 9.6:
sudo apt-get install postgresql-9.6-pglogical
- PostgreSQL 10:
sudo apt-get install postgresql-10-pglogical
- PostgreSQL 11:
sudo apt-get install postgresql-11-pglogical
- PostgreSQL 12:
sudo apt-get install postgresql-12-pglogical
- PostgreSQL 13:
sudo apt-get install postgresql-13-pglogical
- PostgreSQL 14:
sudo apt-get install postgresql-14-pglogical
编译安装
https://github.com/2ndQuadrant/pglogical/releases
Source code installs are the same as for any other PostgreSQL extension built using PGXS.
Make sure the directory containing pg_config
from the PostgreSQL release is listed in your PATH
environment variable. You might have to install a -dev
or -devel
package for your PostgreSQL release from your package manager if you don't have pg_config
.
Then run make
to compile, and make install
to install. You might need to use sudo
for the install step.
1 2 3 4 5 6 7 8 9 | wget https://codeload.github.com/2ndQuadrant/pglogical/tar.gz/refs/tags/REL2_4_1 -O pglogical-REL2_4_1.tar.gz tar -zxvf pglogical-REL2_4_1.tar.gz cd pglogical-REL2_4_1 which pg_config USE_PGXS=1 make clean USE_PGXS=1 make USE_PGXS=1 make install |
安装完之后,查看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | postgres=# select * from pg_available_extensions where name='pglogical'; name | default_version | installed_version | comment -----------+-----------------+-------------------+-------------------------------- pglogical | 2.4.1 | | PostgreSQL Logical Replication (1 row) postgres=# \dx List of installed extensions Name | Version | Schema | Description --------------------+---------+------------+------------------------------------------------------------------------ pageinspect | 1.8 | public | inspect the contents of database pages at a low level pg_recovery | 1.0 | public | recovery table data of update/delete/rollback rows and drop columns pg_stat_statements | 1.8 | public | track planning and execution statistics of all SQL statements executed plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language (4 rows) |
使用配置
配置参数
首先 PostgreSQL服务器必须正确配置才能够支持逻辑解码︰
1 2 3 4 5 6 7 8 | wal_level = 'logical' # one per database needed on (provider/subscriber)provider node max_worker_processes = 10 # one per node needed on provider node max_replication_slots = 10 # one per node needed on provider node max_wal_senders = 10 shared_preload_libraries = 'pglogical' |
配置:
1 2 3 4 5 6 | alter system set shared_preload_libraries = 'pglogical'; alter system set wal_level = 'logical'; select pg_reload_conf(); -- 由于都是postmaster类型的参数,所以需要重启库 |
操作过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart --------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+----------------- max_replication_slots | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously defined replication slots. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_wal_senders | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously running WAL sender processes. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_worker_processes | 8 | | Resource Usage / Asynchronous Behavior | Maximum number of concurrent worker processes. | | postmaster | integer | default | 0 | 262143 | | 8 | 8 | | | f shared_preload_libraries | | | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server. | | postmaster | string | default | | | | | | | | f wal_level | replica | | Write-Ahead Log / Settings | Set the level of information written to the WAL. | | postmaster | enum | default | | | {minimal,replica,logical} | replica | replica | | | f (5 rows) postgres=# alter system set shared_preload_libraries = 'pglogical'; ALTER SYSTEM postgres=# alter system set wal_level = 'logical'; ALTER SYSTEM postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart --------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+----------------- max_replication_slots | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously defined replication slots. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_wal_senders | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously running WAL sender processes. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_worker_processes | 8 | | Resource Usage / Asynchronous Behavior | Maximum number of concurrent worker processes. | | postmaster | integer | default | 0 | 262143 | | 8 | 8 | | | f shared_preload_libraries | | | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server. | | postmaster | string | default | | | | | | | | f wal_level | replica | | Write-Ahead Log / Settings | Set the level of information written to the WAL. | | postmaster | enum | default | | | {minimal,replica,logical} | replica | replica | | | f (5 rows) postgres=# select pg_reload_conf(); pg_reload_conf ---------------- t (1 row) postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart --------------------------+---------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+---------+---------+---------+---------------------------+----------+-----------+------------+------------+----------------- max_replication_slots | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously defined replication slots. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_wal_senders | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously running WAL sender processes. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_worker_processes | 8 | | Resource Usage / Asynchronous Behavior | Maximum number of concurrent worker processes. | | postmaster | integer | default | 0 | 262143 | | 8 | 8 | | | f shared_preload_libraries | | | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server. | | postmaster | string | default | | | | | | | | t wal_level | replica | | Write-Ahead Log / Settings | Set the level of information written to the WAL. | | postmaster | enum | default | | | {minimal,replica,logical} | replica | replica | | | t (5 rows) [pg13@lhrpgall ~]$ pg_ctl restart waiting for server to shut down.... done server stopped waiting for server to start....2022-01-14 11:06:07.103 CST [1272] LOG: redirecting log output to logging collector process 2022-01-14 11:06:07.103 CST [1272] HINT: Future log output will appear in directory "pg_log". done server started postgres=# select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); name | setting | unit | category | short_desc | extra_desc | context | vartype | source | min_val | max_val | enumvals | boot_val | reset_val | sourcefile | sourceline | pending_restart --------------------------+-----------+------+--------------------------------------------------------+-------------------------------------------------------------------------+------------+------------+---------+--------------------+---------+---------+---------------------------+----------+-----------+-----------------------------------+------------+----------------- max_replication_slots | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously defined replication slots. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_wal_senders | 10 | | Replication / Sending Servers | Sets the maximum number of simultaneously running WAL sender processes. | | postmaster | integer | default | 0 | 262143 | | 10 | 10 | | | f max_worker_processes | 8 | | Resource Usage / Asynchronous Behavior | Maximum number of concurrent worker processes. | | postmaster | integer | default | 0 | 262143 | | 8 | 8 | | | f shared_preload_libraries | pglogical | | Client Connection Defaults / Shared Library Preloading | Lists shared libraries to preload into server. | | postmaster | string | configuration file | | | | | pglogical | /pg13/pgdata/postgresql.auto.conf | 3 | f wal_level | logical | | Write-Ahead Log / Settings | Set the level of information written to the WAL. | | postmaster | enum | configuration file | | | {minimal,replica,logical} | replica | logical | /pg13/pgdata/postgresql.auto.conf | 4 | f (5 rows) |
如果你想要处理解决与上一次/第一次更新之间的冲突 wins(参阅冲突章节), 你的数据库版本需要为PostgreSQL 9.5+ (在9.4中无效) 您可以向 PostgreSQL.conf 添加此额外的选项:
1 2 | track_commit_timestamp = on # needed for last/first update wins conflict resolution # property available in PostgreSQL 9.5+ |
配置pg_hba.conf
pg_hba.conf 需要配置成允许从本地主机复制,用户拥有复制权限,连接权限。
1 | host replication postgres 网段ip/24 trust |
创建扩展
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | postgres=# \dx List of installed extensions Name | Version | Schema | Description --------------------+---------+------------+------------------------------------------------------------------------ pageinspect | 1.8 | public | inspect the contents of database pages at a low level pg_recovery | 1.0 | public | recovery table data of update/delete/rollback rows and drop columns pg_stat_statements | 1.8 | public | track planning and execution statistics of all SQL statements executed plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language (4 rows) postgres=# create EXTENSION pglogical; CREATE EXTENSION postgres=# \dx List of installed extensions Name | Version | Schema | Description --------------------+---------+------------+------------------------------------------------------------------------ pageinspect | 1.8 | public | inspect the contents of database pages at a low level pg_recovery | 1.0 | public | recovery table data of update/delete/rollback rows and drop columns pg_stat_statements | 1.8 | public | track planning and execution statistics of all SQL statements executed pglogical | 2.4.1 | pglogical | PostgreSQL Logical Replication plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language (5 rows) postgres=# \dx+ pglogical Objects in extension "pglogical" Object description -------------------------------------------------------------------------------------------------- function pglogical.alter_node_add_interface(name,name,text) function pglogical.alter_node_drop_interface(name,name) function pglogical.alter_replication_set(name,boolean,boolean,boolean,boolean) function pglogical.alter_subscription_add_replication_set(name,name) function pglogical.alter_subscription_disable(name,boolean) function pglogical.alter_subscription_enable(name,boolean) function pglogical.alter_subscription_interface(name,name) function pglogical.alter_subscription_remove_replication_set(name,name) function pglogical.alter_subscription_resynchronize_table(name,regclass,boolean) function pglogical.alter_subscription_synchronize(name,boolean) function pglogical.create_node(name,text) function pglogical.create_replication_set(name,boolean,boolean,boolean,boolean) function pglogical.create_subscription(name,text,text[],boolean,boolean,text[],interval,boolean) function pglogical.drop_node(name,boolean) function pglogical.drop_replication_set(name,boolean) function pglogical.drop_subscription(name,boolean) function pglogical.pglogical_gen_slot_name(name,name,name) function pglogical.pglogical_max_proto_version() function pglogical.pglogical_min_proto_version() function pglogical.pglogical_node_info() function pglogical.pglogical_version() function pglogical.pglogical_version_num() function pglogical.queue_truncate() function pglogical.replicate_ddl_command(text,text[]) function pglogical.replication_set_add_all_sequences(name,text[],boolean) function pglogical.replication_set_add_all_tables(name,text[],boolean) function pglogical.replication_set_add_sequence(name,regclass,boolean) function pglogical.replication_set_add_table(name,regclass,boolean,text[],text) function pglogical.replication_set_remove_sequence(name,regclass) function pglogical.replication_set_remove_table(name,regclass) function pglogical.show_repset_table_info(regclass,text[]) function pglogical.show_subscription_status(name) function pglogical.show_subscription_table(name,regclass) function pglogical.synchronize_sequence(regclass) function pglogical.table_data_filtered(anyelement,regclass,text[]) function pglogical.wait_for_subscription_sync_complete(name) function pglogical.wait_for_table_sync_complete(name,regclass) function pglogical.wait_slot_confirm_lsn(name,pg_lsn) function pglogical.xact_commit_timestamp_origin(xid) table pglogical.depend table pglogical.local_node table pglogical.local_sync_status table pglogical.node table pglogical.node_interface table pglogical.queue table pglogical.replication_set table pglogical.replication_set_seq table pglogical.replication_set_table table pglogical.sequence_state table pglogical.subscription view pglogical.tables (51 rows) |
其它
数据字典
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | select * from pglogical.depend ; select * from pglogical.local_node ; select * from pglogical.local_sync_status ; select * from pglogical.node ; select * from pglogical.node_interface ; select * from pglogical.queue ; select * from pglogical.replication_set ; select * from pglogical.replication_set_seq ; select * from pglogical.replication_set_table ; select * from pglogical.sequence_state ; select * from pglogical.subscription ; select * from pglogical.tables ; select * from pglogical.show_subscription_status(); |
复制集
在复制集default中: update/delete/truncate 操作也是同步复制。
复制集 | INSERT | UPDATE | DELETE | TRUNCATE |
---|---|---|---|---|
default | √ | √ | √ | √ |
default_insert_only | √ | × | × | √ |
ddl_sql | √ | × | × | × |
1 2 3 4 5 6 7 | lhrdb=# select * from pglogical.replication_set ; set_id | set_nodeid | set_name | replicate_insert | replicate_update | replicate_delete | replicate_truncate ------------+------------+---------------------+------------------+------------------+------------------+-------------------- 3808633458 | 345938905 | default | t | t | t | t 1988720057 | 345938905 | default_insert_only | t | f | f | t 742713876 | 345938905 | ddl_sql | t | f | f | f (3 rows) |
复制特性扩展
延迟复制
1 | pglogical.create_subscription(subscription_name name, provider_dsn text, replication_sets text[], synchronize_structure boolean, synchronize_data boolean, forward_origins text[], apply_delay interval) |
参数:
- subscription_name - 订阅的名称,必须是唯一的
- provider_dsn - 提供者的连接字符串
- replication_sets - 要订阅的复制集数组,这些必须已存在,默认为“{default,default_insert_only,ddl_sql}”
- synchronize_structure - 指定是否将提供者与订阅者之间的结构同步,默认为false
- synchronize_data - 指定是否将数据从提供者同步到订阅者,默认为true
- forward_origins - 要转发的原始名称数组,当前只支持的值是空数组,意味着不转发任何不是源自提供者节点的更改,或“{all}”这意味着复制所有更改,无论它们的来源是什么,默认是全部}”
- apply_delay - 延迟复制多少,默认为0秒
示例:数据表结构同步;且延迟复制1分钟
1 2 3 4 5 6 | SELECT pglogical.create_subscription( subscription_name := 'subscription1', provider_dsn := 'host=192.168.1.221 port=5432 dbname=lhrdb', synchronize_structure := true, apply_delay := '00:01:00'::interval ); |
对源端进行 行/列 过滤
过滤机制需要 PostgreSQL 9.5 +
1 | pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text) |
参数:
- set_name - 现有复制集的名称
- relation - 要添加到集合中的表的名称或OID
- synchronize_data - 如果为true,则表数据将在订阅给定复制集的所有订户上同步,默认为false
- columns - 要复制的列的列表。通常,当应复制所有列时,这将设置为NULL,这是默认值
- row_filter - 行过滤表达式,默认为NULL(无过滤),有关详细信息,请参阅(行过滤)。警告:在使用有效行筛选器同步数据时要小心。使用synchronize_data=true有效row_filter就像对表的一次性操作。使用修改后再次执行它将row_filter不会将数据同步到订户。订阅者可能需要pglogical.alter_subscription_resynchronize_table()来修复它。
示例:对表tbl_lottu02中字段{id, name, job} 字段列过滤;且对条件 ‘id > 10’ 进行行过滤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | # provider 节点 创建表并插入测试数据 create table tbl_lottu02 (id int primary key, name text, job text, reg_time timestamp ); insert into tbl_lottu02 select generate_series(1,20) id,'lottu'||generate_series(1,20),'pg', now(); # subscriber节点创建表; 可以只创建复制的列的数据表 create table tbl_lottu02 (id int primary key, name text, job text, reg_time timestamp ); # or create table tbl_lottu02 (id int primary key, name text, job text); #provider 节点 将表加入复制集中;并同步记录 lottu=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tbl_lottu02', synchronize_data := true, columns := '{id, name, job}',row_filter := 'id < 10'); replication_set_add_table --------------------------- t (1 row) # subscriber节点查看表tbl_lottu02记录 lottu=# select * from tbl_lottu02; id | name | job ----+--------+----- 1 | lhrdb1 | pg 2 | lhrdb2 | pg 3 | lhrdb3 | pg 4 | lhrdb4 | pg 5 | lhrdb5 | pg 6 | lhrdb6 | pg 7 | lhrdb7 | pg 8 | lhrdb8 | pg 9 | lhrdb9 | pg (9 rows) |
为新表自动分配复制集
事件触发器工具可用于描述为新创建的表定义复制集的规则。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | CREATE OR REPLACE FUNCTION pglogical_assign_repset() RETURNS event_trigger AS $$ DECLARE obj record; BEGIN FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands() LOOP IF obj.object_type = 'table' THEN IF obj.schema_name = 'config' THEN PERFORM pglogical.replication_set_add_table('configuration', obj.objid); ELSIF NOT obj.in_extension THEN PERFORM pglogical.replication_set_add_table('default', obj.objid); END IF; END IF; END LOOP; END; $$ LANGUAGE plpgsql; CREATE EVENT TRIGGER pglogical_assign_repset_trg ON ddl_command_end WHEN TAG IN ('CREATE TABLE', 'CREATE TABLE AS') EXECUTE PROCEDURE pglogical_assign_repset(); |
冲突检测
冲突检测需要 PostgreSQL 9.5 +
如果节点订阅多个提供程序,或当本地写入在订阅服务器上发生,可能会发生冲突,尤其是对传入的变化。这些都自动检测,并可以就此采取行动取决于配置。
解决冲突的办法是通过配置 pglogical.conflict_resolution 参数。
pglogical.conflict_resolution 支持的配置参数选项为︰
- error - 复制将停止上错误如果检测到冲突和手动操作需要解决
- apply_remote - 总是应用与本地数据有冲突的更改,这是默认值
- keep_local - 保留数据的本地版本,并忽略来自远程节点相互冲突的更改
- last_update_wins - 时间戳为提交最新的版本(newest commit timestamp)的数据将会被保存(这可以是本地或远程版本)
- first_update_wins - 时间戳为最旧的版本(oldest timestamp)的数据将会被保存(这可以是本地或远程版本)
当参数track_commit_timestamp被禁用时,唯一允许的配置值是 apply_remote。 PostgreSQL 9.4 不支持 track_commit_timestamp 配置参数只能配置参数apply_remote(该参数是默认值)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | # 在 订阅者 节点配置;我们保留最新的数据 track_commit_timestamp = on pglogical.conflict_resolution = 'last_update_wins' # 在 订阅者 节点创建测试表tbl_lhrdb03 lottu=# create table tbl_lhrdb03(id int primary key, name text); CREATE TABLE lottu=# insert into tbl_lhrdb03 values (1001,'subscriber'); INSERT 0 1 # 在 发布者 节点 创建测试表 create table tbl_lhrdb03(id int primary key, name text); select pglogical.replication_set_add_table( set_name := 'default', relation := 'tbl_lhrdb03',synchronize_data := true); insert into tbl_lhrdb03 values (1001,'provider'); # 在 订阅者 节点 查看数据 lottu=# select * from tbl_lottu03; id | name ------+---------- 1001 | provider |
后记: 在订阅者的表需要主键约束;不然检测不到冲突;是否需要主键约束当然这个也是根据需求而定。
示例一:1个发布端,2个订阅端
现有实验环境
数据库版本 | 操作系统 | IP | 数据库 | 角色 |
---|---|---|---|---|
PostgreSQL 13.4 | Debian GNU/Linux 11 | 172.72.6.30 | lhrdb | provider |
PostgreSQL 13.4 | Debian GNU/Linux 11 | 172.72.6.31 | lhrdb | subscriber |
PostgreSQL 12.8 | Debian GNU/Linux 11 | 172.72.6.32 | lhrdb | subscriber |
环境准备
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | -- 创建专用网络 docker network create --subnet=172.72.6.0/24 pg-network -- PG13 docker rm -f lhrpg30 docker run -d --name lhrpg30 -h lhrpg30 \ -p 64330:5432 --net=pg-network --ip 172.72.6.30 \ -e POSTGRES_PASSWORD=lhr \ -e TZ=Asia/Shanghai \ postgres:13.4 docker rm -f lhrpg31 docker run -d --name lhrpg31 -h lhrpg31 \ -p 64331:5432 --net=pg-network --ip 172.72.6.31 \ -e POSTGRES_PASSWORD=lhr \ -e TZ=Asia/Shanghai \ postgres:13.4 -- PG12 docker rm -f lhrpg32 docker run -d --name lhrpg32 -h lhrpg32 \ -p 64332:5432 --net=pg-network --ip 172.72.6.32 \ -e POSTGRES_PASSWORD=lhr \ -e TZ=Asia/Shanghai \ postgres:12.8 -- 安装插件 -- PG12 apt update apt install -y curl curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash apt-get install postgresql-12-pglogical -- PG13 apt update apt install -y curl curl https://techsupport.enterprisedb.com/api/repository/dl/default/release/deb | bash apt-get install postgresql-13-pglogical -- 常用包 cp /etc/apt/sources.list /etc/apt/sources.list_bk cat > /etc/apt/sources.list <<"EOF" deb http://mirrors.aliyun.com/debian/ buster main non-free contrib deb http://mirrors.aliyun.com/debian-security buster/updates main deb http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib deb http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib deb-src http://mirrors.aliyun.com/debian-security buster/updates main deb-src http://mirrors.aliyun.com/debian/ buster main non-free contrib deb-src http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib deb-src http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib EOF apt-get install -y aptitude aptitude install -y curl wget iputils-ping procps net-tools lsb-release build-essential sysstat telnet aptitude install -y vim bzip2 gnupg2 libtinfo5 -- 3个PG库需要配置 psql -U postgres -h 192.168.66.35 -p 64330 psql -U postgres -h 192.168.66.35 -p 64331 psql -U postgres -h 192.168.66.35 -p 64332 alter system set shared_preload_libraries = 'pglogical'; alter system set wal_level = 'logical'; select pg_reload_conf(); select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); docker restart lhrpg30 lhrpg31 lhrpg32 create database lhrdb; \c lhrdb create EXTENSION pglogical; cat > /var/lib/postgresql/data/pg_hba.conf <<"EOF" local all all trust host all all 127.0.0.1/32 trust host all all ::1/128 trust local replication all trust host replication all 127.0.0.1/32 trust host replication all ::1/128 trust host replication postgres 172.72.6.0/24 trust host all all all md5 EOF select * from pg_hba_file_rules; -- 查看日志 docker logs -n 10 -f lhrpg30 docker logs -n 10 -f lhrpg31 docker logs -n 10 -f lhrpg32 |
提供者节点(发布端)配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | -- 1、创建节点:在数据库PG13里创建提供者节点 -- SELECT pglogical.drop_node(node_name := 'provider30'); SELECT pglogical.create_node( node_name := 'provider30', dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb' ); -- 查看 select * from pglogical.node_interface ; -- 2、创建复制集:将public架构中的所有表添加到default复制集中,复制集default的表都必需要primary key SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']); -- 创建完订阅者后,才会有复制槽 select * from pg_replication_slots ; |
运行过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | lhrdb=# SELECT pglogical.create_node( lhrdb(# node_name := 'provider30', lhrdb(# dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb' lhrdb(# ); create_node ------------- 2019914661 (1 row) lhrdb=# select * from pglogical.local_node ; node_id | node_local_interface ------------+---------------------- 2019914661 | 1955427756 (1 row) lhrdb=# select * from pglogical.node_interface ; if_id | if_name | if_nodeid | if_dsn ------------+------------+------------+----------------------------------------- 1955427756 | provider30 | 2019914661 | host=172.72.6.30 port=5432 dbname=lhrdb (1 row) lhrdb=# select * from pg_replication_slots ; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size -----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------- (0 rows) lhrdb=# lhrdb=# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']); replication_set_add_all_tables -------------------------------- t (1 row) |
订阅者PG13节点配置
pglogical 可以同步表/序列结构;在创建订阅者 'pglogical.create_subscription' ; 里面参数synchronize_structure - 指定是否将提供者与订阅者之间的结构同步,默认为false。可以同步表/序列/索引,该功能仅限于同版本,不同版本会报错,具体可以根据情况来测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | -- 1、创建节点,在数据库PG13创建订阅者节点 SELECT pglogical.create_node( node_name := 'subnode31', dsn := 'host=172.72.6.31 port=5432 dbname=lhrdb user=postgres password=lhr' ); -- 查询 select * from pglogical.node_interface ; -- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程 SELECT pglogical.create_subscription( subscription_name := 'sub31', provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr', synchronize_structure := true, synchronize_data := true ); -- 查询 select * from pglogical.node_interface ; select * from pglogical.subscription ; select * from pglogical.show_subscription_status(); -- 创建订阅这个命令执行完成后,会在“发布端”创建一个复制槽,这个很重要,一个订阅者一个复制槽 select * from pg_replication_slots ; -- 删除: 若要删除,则先删除订阅者,再删除节点 SELECT pglogical.drop_subscription(subscription_name := 'sub31'); SELECT pglogical.drop_node(node_name := 'subnode31'); |
过程:
订阅端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | lhrdb=# SELECT pglogical.create_node( lhrdb(# node_name := 'subnode31', lhrdb(# dsn := 'host=172.72.6.31 port=5432 dbname=lhrdb user=postgres password=lhr' lhrdb(# ); create_node ------------- 3125886449 (1 row) lhrdb=# lhrdb=# SELECT pglogical.create_subscription( lhrdb(# subscription_name := 'sub31', lhrdb(# provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr', lhrdb(# synchronize_structure := true, lhrdb(# synchronize_data := true lhrdb(# ); create_subscription --------------------- 1804764769 (1 row) lhrdb=# select * from pglogical.local_node ; node_id | node_local_interface ------------+---------------------- 3125886449 | 450827623 (1 row) lhrdb=# select * from pglogical.node ; node_id | node_name ------------+------------ 3125886449 | subnode31 2019914661 | provider30 (2 rows) lhrdb=# select * from pglogical.node_interface ; if_id | if_name | if_nodeid | if_dsn ------------+------------+------------+--------------------------------------------------------------------- 450827623 | subnode31 | 3125886449 | host=172.72.6.31 port=5432 dbname=lhrdb user=postgres password=lhr 1955427756 | provider30 | 2019914661 | host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr (2 rows) lhrdb=# select * from pglogical.subscription ; sub_id | sub_name | sub_origin | sub_target | sub_origin_if | sub_target_if | sub_enabled | sub_slot_name | sub_replication_sets | sub_forward_origins | sub_apply_delay | sub_force_text_transfer ------------+----------+------------+------------+---------------+---------------+-------------+----------------------------+---------------------------------------+---------------------+-----------------+------------------------- 1804764769 | sub31 | 2019914661 | 3125886449 | 1955427756 | 450827623 | t | pgl_lhrdb_provider30_sub31 | {default,default_insert_only,ddl_sql} | {all} | 00:00:00 | f (1 row) lhrdb=# lhrdb=# select * from pglogical.show_subscription_status(); subscription_name | status | provider_node | provider_dsn | slot_name | replication_sets | forward_origins -------------------+-------------+---------------+---------------------------------------------------------------------+----------------------------+---------------------------------------+----------------- sub31 | replicating | provider30 | host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr | pgl_lhrdb_provider30_sub31 | {default,default_insert_only,ddl_sql} | {all} (1 row) |
发布端查询:
1 2 3 4 5 | lhrdb=# select * from pg_replication_slots ; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size ----------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------- pgl_lhrdb_provider30_sub31 | pglogical_output | logical | 16384 | lhrdb | f | t | 80 | | 489 | 0/166D078 | 0/166D0B0 | reserved | (1 row) |
发布端告警日志:
1 2 3 4 5 6 7 8 9 10 11 | 2022-01-14 20:24:05.544 CST [77] LOG: logical decoding found consistent point at 0/166D078 2022-01-14 20:24:05.544 CST [77] DETAIL: There are no running transactions. 2022-01-14 20:24:05.544 CST [77] STATEMENT: CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub31" LOGICAL pglogical_output 2022-01-14 20:24:05.544 CST [77] LOG: exported logical decoding snapshot: "00000007-00000002-1" with 0 transaction IDs 2022-01-14 20:24:05.544 CST [77] STATEMENT: CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub31" LOGICAL pglogical_output 2022-01-14 20:24:06.352 CST [80] LOG: starting logical decoding for slot "pgl_lhrdb_provider30_sub31" 2022-01-14 20:24:06.352 CST [80] DETAIL: Streaming transactions committing after 0/166D0B0, reading WAL from 0/166D078. 2022-01-14 20:24:06.352 CST [80] STATEMENT: START_REPLICATION SLOT "pgl_lhrdb_provider30_sub31" LOGICAL 0/166D0B0 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1300', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '0', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '130005', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '108') 2022-01-14 20:24:06.353 CST [80] LOG: logical decoding found consistent point at 0/166D078 2022-01-14 20:24:06.353 CST [80] DETAIL: There are no running transactions. 2022-01-14 20:24:06.353 CST [80] STATEMENT: START_REPLICATION SLOT "pgl_lhrdb_provider30_sub31" LOGICAL 0/166D0B0 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1300', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '0', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '130005', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '108') |
订阅端告警日志:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | 2022-01-14 20:23:49.062 CST [74] LOG: apply worker [74] at slot 2 generation 1 detaching cleanly 2022-01-14 20:23:49.067 CST [97] LOG: manager worker [97] at slot 1 generation 4 detaching cleanly 2022-01-14 20:23:49.075 CST [98] LOG: manager worker [98] at slot 1 generation 5 detaching cleanly 2022-01-14 20:23:49.290 CST [67] LOG: manager worker [67] at slot 0 generation 4 detaching cleanly 2022-01-14 20:23:49.296 CST [99] LOG: manager worker [99] at slot 1 generation 6 detaching cleanly 2022-01-14 20:23:49.299 CST [100] LOG: starting pglogical database manager for database lhrdb 2022-01-14 20:23:49.299 CST [100] LOG: manager worker [100] at slot 0 generation 5 detaching cleanly 2022-01-14 20:23:49.304 CST [101] LOG: manager worker [101] at slot 1 generation 7 detaching cleanly 2022-01-14 20:23:55.379 CST [102] LOG: manager worker [102] at slot 0 generation 6 detaching cleanly 2022-01-14 20:23:55.387 CST [103] LOG: starting pglogical database manager for database lhrdb 2022-01-14 20:23:56.390 CST [104] LOG: manager worker [104] at slot 1 generation 8 detaching cleanly 2022-01-14 20:24:05.077 CST [107] LOG: manager worker [107] at slot 1 generation 9 detaching cleanly 2022-01-14 20:24:05.077 CST [108] LOG: starting apply for subscription sub31 2022-01-14 20:24:05.085 CST [109] LOG: manager worker [109] at slot 1 generation 10 detaching cleanly |
订阅者PG12节点配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | -- 1、创建节点,在数据库PG12创建订阅者节点 SELECT pglogical.create_node( node_name := 'subnode32', dsn := 'host=172.72.6.32 port=5432 dbname=lhrdb user=postgres password=lhr' ); -- 查询 select * from pglogical.node_interface ; -- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程 SELECT pglogical.create_subscription( subscription_name := 'sub32', provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr' ); -- 查询 select * from pglogical.node_interface ; select * from pglogical.subscription ; select * from pglogical.show_subscription_status(); -- 创建订阅这个命令执行完成后,会在发布端创建一个复制槽,这个很重要,一个订阅者一个复制槽 select * from pg_replication_slots ; -- 删除: 若要删除,则先删除订阅者,再删除节点 SELECT pglogical.drop_subscription(subscription_name := 'sub32'); SELECT pglogical.drop_node(node_name := 'subnode32'); |
订阅端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | lhrdb=# SELECT pglogical.create_node( lhrdb(# node_name := 'subnode32', lhrdb(# dsn := 'host=172.72.6.32 port=5432 dbname=lhrdb user=postgres password=lhr' lhrdb(# ); create_node ------------- 235272207 (1 row) lhrdb=# SELECT pglogical.create_subscription( lhrdb(# subscription_name := 'sub32', lhrdb(# provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr' lhrdb(# ); create_subscription --------------------- 239145690 (1 row) lhrdb=# select * from pglogical.local_node ; node_id | node_local_interface -----------+---------------------- 235272207 | 3519183068 (1 row) lhrdb=# select * from pglogical.local_sync_status ; sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn -----------+------------+--------------+--------------+-------------+---------------- d | 239145690 | | | r | 0/0 (1 row) lhrdb=# select * from pglogical.node ; node_id | node_name ------------+------------ 235272207 | subnode32 2019914661 | provider30 (2 rows) lhrdb=# select * from pglogical.node_interface ; if_id | if_name | if_nodeid | if_dsn ------------+------------+------------+--------------------------------------------------------------------- 3519183068 | subnode32 | 235272207 | host=172.72.6.32 port=5432 dbname=lhrdb user=postgres password=lhr 1955427756 | provider30 | 2019914661 | host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr (2 rows) lhrdb=# select * from pglogical.subscription ; sub_id | sub_name | sub_origin | sub_target | sub_origin_if | sub_target_if | sub_enabled | sub_slot_name | sub_replication_sets | sub_forward_origins | sub_apply_delay | sub_force_text_transfer -----------+----------+------------+------------+---------------+---------------+-------------+----------------------------+---------------------------------------+---------------------+-----------------+------------------------- 239145690 | sub32 | 2019914661 | 235272207 | 1955427756 | 3519183068 | t | pgl_lhrdb_provider30_sub32 | {default,default_insert_only,ddl_sql} | {all} | 00:00:00 | f (1 row) lhrdb=# select * from pglogical.show_subscription_status(); subscription_name | status | provider_node | provider_dsn | slot_name | replication_sets | forward_origins -------------------+-------------+---------------+---------------------------------------------------------------------+----------------------------+---------------------------------------+----------------- sub32 | replicating | provider30 | host=172.72.6.30 port=5432 dbname=lhrdb user=postgres password=lhr | pgl_lhrdb_provider30_sub32 | {default,default_insert_only,ddl_sql} | {all} (1 row) |
发布端:
1 2 3 4 5 6 | lhrdb=# select * from pg_replication_slots ; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size ----------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------- pgl_lhrdb_provider30_sub31 | pglogical_output | logical | 16384 | lhrdb | f | t | 80 | | 489 | 0/166D0B0 | 0/166D0E8 | reserved | pgl_lhrdb_provider30_sub32 | pglogical_output | logical | 16384 | lhrdb | f | t | 105 | | 489 | 0/166D0B0 | 0/166D0E8 | reserved | (2 rows) |
可见,一个订阅端会产生一个复制槽。
发布端告警日志:
1 2 3 4 5 6 7 8 9 10 11 | 2022-01-14 20:30:26.462 CST [103] LOG: logical decoding found consistent point at 0/166D0B0 2022-01-14 20:30:26.462 CST [103] DETAIL: There are no running transactions. 2022-01-14 20:30:26.462 CST [103] STATEMENT: CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub32" LOGICAL pglogical_output 2022-01-14 20:30:26.462 CST [103] LOG: exported logical decoding snapshot: "00000008-00000002-1" with 0 transaction IDs 2022-01-14 20:30:26.462 CST [103] STATEMENT: CREATE_REPLICATION_SLOT "pgl_lhrdb_provider30_sub32" LOGICAL pglogical_output 2022-01-14 20:30:26.583 CST [105] LOG: starting logical decoding for slot "pgl_lhrdb_provider30_sub32" 2022-01-14 20:30:26.583 CST [105] DETAIL: Streaming transactions committing after 0/166D0E8, reading WAL from 0/166D0B0. 2022-01-14 20:30:26.583 CST [105] STATEMENT: START_REPLICATION SLOT "pgl_lhrdb_provider30_sub32" LOGICAL 0/166D0E8 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1200', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '1', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '120009', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '95') 2022-01-14 20:30:26.583 CST [105] LOG: logical decoding found consistent point at 0/166D0B0 2022-01-14 20:30:26.583 CST [105] DETAIL: There are no running transactions. 2022-01-14 20:30:26.583 CST [105] STATEMENT: START_REPLICATION SLOT "pgl_lhrdb_provider30_sub32" LOGICAL 0/166D0E8 (expected_encoding 'UTF8', min_proto_version '1', max_proto_version '1', startup_params_format '1', "binary.want_internal_basetypes" '1', "binary.want_binary_basetypes" '1', "binary.basetypes_major_version" '1200', "binary.sizeof_datum" '8', "binary.sizeof_int" '4', "binary.sizeof_long" '8', "binary.bigendian" '0', "binary.float4_byval" '1', "binary.float8_byval" '1', "binary.integer_datetimes" '0', "hooks.setup_function" 'pglogical.pglogical_hooks_setup', "pglogical.forward_origins" '"all"', "pglogical.replication_set_names" '"default",default_insert_only,ddl_sql', "relmeta_cache_size" '-1', pg_version '120009', pglogical_version '2.4.1', pglogical_version_num '20401', pglogical_apply_pid '95') |
订阅端告警日志:
1 2 3 4 5 6 | 2022-01-14 20:30:19.585 CST [90] LOG: manager worker [90] at slot 0 generation 3 detaching cleanly 2022-01-14 20:30:19.592 CST [91] LOG: starting pglogical database manager for database lhrdb 2022-01-14 20:30:20.595 CST [92] LOG: manager worker [92] at slot 1 generation 1 detaching cleanly 2022-01-14 20:30:25.943 CST [94] LOG: manager worker [94] at slot 1 generation 2 detaching cleanly 2022-01-14 20:30:25.944 CST [95] LOG: starting apply for subscription sub32 2022-01-14 20:30:25.951 CST [96] LOG: manager worker [96] at slot 1 generation 3 detaching cleanly |
验证复制
发布端创建表
由于需要验证insert/update/delete/truncate操作是否同步;所以创建的表必须要有主键,若没有主键,则不能同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 | -- 发布者 create table tb_test(id int primary key, name text, reg_time timestamp); insert into tb_test select generate_series(1,10000),'xxt',now(); select count(*) from tb_test; create table tb_test2(id int primary key, name text, reg_time timestamp); insert into tb_test2 select generate_series(1,20000),'xxt',now(); select count(*) from tb_test2; -- 订阅端需要创建表结构 create table tb_test(id int primary key, name text, reg_time timestamp); create table tb_test2(id int primary key, name text, reg_time timestamp); |
将表添加对应的复制集
对新建的表;并没有为其分配对应的复制集;需要手动添加。当然可以利用触发器自动添加。
有2种方法:
方法1:将public架构中的所有表添加到default复制集中
1 | SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']); |
方法2:将表添加到对应的复制集中
1 | pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text) |
例如:
1 2 3 4 5 | lhrdb=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tb_test',synchronize_data := true); replication_set_add_table --------------------------- t (1 row) |
查询:
1 2 3 4 5 | select * from pglogical.replication_set_table ; select * from pglogical.depend ; select * from pglogical.tables ; |
发布端执行过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | lhrdb=# create table tb_test(id int primary key, name text, reg_time timestamp); CREATE TABLE lhrdb=# lhrdb=# insert into tb_test select generate_series(1,10000),'xxt',now(); INSERT 0 10000 lhrdb=# select count(*) from tb_test; count ------- 10000 (1 row) lhrdb=# select * from pglogical.replication_set_table ; set_id | set_reloid | set_att_list | set_row_filter --------+------------+--------------+---------------- (0 rows) lhrdb=# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']); replication_set_add_all_tables -------------------------------- t (1 row) lhrdb=# select * from pglogical.replication_set_table ; set_id | set_reloid | set_att_list | set_row_filter ----------+------------+--------------+---------------- 48521716 | tb_test | | (1 row) lhrdb=# create table tb_test2(id int primary key, name text, reg_time timestamp); CREATE TABLE lhrdb=# insert into tb_test2 select generate_series(1,20000),'xxt',now(); INSERT 0 20000 lhrdb=# select count(*) from tb_test2; count ------- 20000 (1 row) lhrdb=# \d List of relations Schema | Name | Type | Owner --------+----------+-------+---------- public | tb_test | table | postgres public | tb_test2 | table | postgres (2 rows) lhrdb=# select * from pglogical.replication_set_table ; set_id | set_reloid | set_att_list | set_row_filter ----------+------------+--------------+---------------- 48521716 | tb_test | | (1 row) lhrdb=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tb_test2',synchronize_data := true); replication_set_add_table --------------------------- t (1 row) lhrdb=# select * from pglogical.replication_set_table ; set_id | set_reloid | set_att_list | set_row_filter ----------+------------+--------------+---------------- 48521716 | tb_test | | 48521716 | tb_test2 | | (2 rows) lhrdb=# select * from pglogical.depend ; classid | objid | objsubid | refclassid | refobjid | refobjsubid | deptype ---------+------------+----------+------------+----------+-------------+--------- 16491 | 1930385120 | 16550 | 1259 | 16550 | 0 | n 16491 | 1930385120 | 16558 | 1259 | 16558 | 0 | n (2 rows) lhrdb=# select * from pglogical.tables ; relid | nspname | relname | set_name -------+---------+----------+---------- 16558 | public | tb_test2 | default 16550 | public | tb_test | default (2 rows) |
订阅端操作
1 2 3 4 5 6 7 8 9 10 11 12 13 | -- 重新同步一个表 select * from pglogical.alter_subscription_resynchronize_table('sub31','tb_test') ; -- 将所有的表都同步 select * from pglogical.alter_subscription_synchronize('sub31', true) ; -- 查看同步状态 select * from pglogical.local_sync_status; -- 查看subscriber 节点 select * from pglogical.show_subscription_table('sub31','tb_test'); |
PG31操作过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | lhrdb=# select * from pglogical.local_sync_status; sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn -----------+------------+--------------+--------------+-------------+---------------- f | 1804764769 | | | r | 0/0 (1 row) lhrdb=# select * from pglogical.alter_subscription_synchronize('sub31', true) ; alter_subscription_synchronize -------------------------------- t (1 row) lhrdb=# select * from pglogical.local_sync_status; sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn -----------+------------+--------------+--------------+-------------+---------------- d | 1804764769 | public | tb_test | i | 0/0 f | 1804764769 | | | r | 0/0 d | 1804764769 | public | tb_test2 | i | 0/0 (3 rows) lhrdb=# \dt Did not find any relations. lhrdb=# lhrdb=# create table tb_test(id int primary key, name text, reg_time timestamp); CREATE TABLE lhrdb=# create table tb_test2(id int primary key, name text, reg_time timestamp); CREATE TABLE lhrdb=# select * from pglogical.local_sync_status; sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn -----------+------------+--------------+--------------+-------------+---------------- f | 1804764769 | | | r | 0/0 d | 1804764769 | public | tb_test | r | 0/1AA00D0 d | 1804764769 | public | tb_test2 | r | 0/1AA0108 (3 rows) lhrdb=# lhrdb=# select * from pglogical.show_subscription_table('sub31','tb_test'); nspname | relname | status ---------+---------+------------- public | tb_test | replicating (1 row) lhrdb=# select * from pglogical.show_subscription_table('sub31','tb_test2'); nspname | relname | status ---------+----------+------------- public | tb_test2 | replicating (1 row) lhrdb=# select count(*) from tb_test; count ------- 10000 (1 row) |
PG32操作过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | lhrdb=# \dt Did not find any relations. lhrdb=# lhrdb=# create table tb_test(id int primary key, name text, reg_time timestamp); CREATE TABLE lhrdb=# create table tb_test2(id int primary key, name text, reg_time timestamp); CREATE TABLE lhrdb=# select * from pglogical.local_sync_status; sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn -----------+------------+--------------+--------------+-------------+---------------- d | 239145690 | | | r | 0/0 (1 row) lhrdb=# lhrdb=# select * from pglogical.alter_subscription_synchronize('sub32', true) ; alter_subscription_synchronize -------------------------------- t (1 row) lhrdb=# select * from pglogical.local_sync_status; sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn -----------+------------+--------------+--------------+-------------+---------------- d | 239145690 | | | r | 0/0 d | 239145690 | public | tb_test | r | 0/1AA0140 d | 239145690 | public | tb_test2 | w | 0/1AA0178 (3 rows) lhrdb=# select * from pglogical.show_subscription_table('sub32','tb_test'); nspname | relname | status ---------+---------+------------- public | tb_test | replicating (1 row) lhrdb=# select * from pglogical.local_sync_status; sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn -----------+------------+--------------+--------------+-------------+---------------- d | 239145690 | | | r | 0/0 d | 239145690 | public | tb_test | r | 0/1AA0140 d | 239145690 | public | tb_test2 | r | 0/1AA0178 (3 rows) lhrdb=# select count(*) from tb_test; count ------- 10000 (1 row) |
发布端新增表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | lhrdb=# create table test(id int primary key, name text, reg_time timestamp); CREATE TABLE lhrdb=# insert into test select generate_series(1,10000),'xxt',now(); INSERT 0 10000 lhrdb=# select count(*) from test; count ------- 10000 (1 row) lhrdb=# select * from pglogical.tables ; relid | nspname | relname | set_name -------+---------+----------+---------- 16558 | public | tb_test2 | default 16550 | public | tb_test | default 16568 | public | test | (3 rows) lhrdb=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'test',synchronize_data := true); replication_set_add_table --------------------------- t (1 row) lhrdb=# select * from pglogical.tables ; relid | nspname | relname | set_name -------+---------+----------+---------- 16558 | public | tb_test2 | default 16550 | public | tb_test | default 16568 | public | test | default (3 rows) |
订阅端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | lhrdb=# \dt List of relations Schema | Name | Type | Owner --------+----------+-------+---------- public | tb_test | table | postgres public | tb_test2 | table | postgres (2 rows) lhrdb=# select * from pglogical.alter_subscription_resynchronize_table('sub31','test') ; ERROR: relation "test" does not exist LINE 1: ...cal.alter_subscription_resynchronize_table('sub31','test') ; lhrdb=# create table test(id int primary key, name text, reg_time timestamp); CREATE TABLE lhrdb=# select * from pglogical.local_sync_status; sync_kind | sync_subid | sync_nspname | sync_relname | sync_status | sync_statuslsn -----------+------------+--------------+--------------+-------------+---------------- f | 1804764769 | | | r | 0/0 d | 1804764769 | public | tb_test | r | 0/1AA00D0 d | 1804764769 | public | tb_test2 | r | 0/1AA0108 d | 1804764769 | public | test | r | 0/1C2B9E8 (4 rows) lhrdb=# select count(*) from test; count ------- 10000 (1 row) |
进程
发布端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | root@lhrpg30:/# ps -ef|grep post postgres 1 0 0 20:14 ? 00:00:04 postgres postgres 27 1 0 20:14 ? 00:00:01 postgres: checkpointer postgres 28 1 0 20:14 ? 00:00:00 postgres: background writer postgres 29 1 0 20:14 ? 00:00:04 postgres: walwriter postgres 30 1 0 20:14 ? 00:00:00 postgres: autovacuum launcher postgres 31 1 0 20:14 ? 00:00:01 postgres: stats collector postgres 32 1 0 20:14 ? 00:00:00 postgres: pglogical supervisor postgres 33 1 0 20:14 ? 00:00:00 postgres: logical replication launcher postgres 37 1 0 20:15 ? 00:00:00 postgres: postgres lhrdb 192.168.66.64(1444) idle postgres 51 1 0 20:17 ? 00:00:00 postgres: pglogical manager 16384 postgres 80 1 0 20:24 ? 00:00:02 postgres: walsender postgres 172.72.6.31(55390) idle postgres 105 1 0 20:30 ? 00:00:02 postgres: walsender postgres 172.72.6.32(41266) idle root 3354 38 0 21:00 pts/0 00:00:00 grep post |
订阅端:
1 2 3 4 5 6 7 8 9 10 11 12 13 | root@lhrpg31:/# ps -ef| grep post postgres 1 0 0 20:14 ? 00:00:01 postgres postgres 27 1 0 20:14 ? 00:00:00 postgres: checkpointer postgres 28 1 0 20:14 ? 00:00:00 postgres: background writer postgres 29 1 0 20:14 ? 00:00:03 postgres: walwriter postgres 30 1 0 20:14 ? 00:00:00 postgres: autovacuum launcher postgres 31 1 0 20:14 ? 00:00:01 postgres: stats collector postgres 32 1 0 20:14 ? 00:00:00 postgres: pglogical supervisor postgres 33 1 0 20:14 ? 00:00:00 postgres: logical replication launcher postgres 37 1 0 20:15 ? 00:00:00 postgres: postgres lhrdb 192.168.66.64(1452) idle postgres 103 1 0 20:23 ? 00:00:00 postgres: pglogical manager 16384 postgres 108 1 0 20:24 ? 00:00:03 postgres: pglogical apply 16384:1804764769 root 1205 38 0 21:08 pts/0 00:00:00 grep post |
示例二:2个发布端,1个订阅端
现有实验环境
数据库版本 | 操作系统 | IP | 数据库 | 角色 |
---|---|---|---|---|
PostgreSQL 13.4 | Debian GNU/Linux 11 | 172.72.6.30 | lhrdb2 | provider |
PostgreSQL 13.4 | Debian GNU/Linux 11 | 172.72.6.31 | lhrdb2 | provider |
PostgreSQL 12.8 | Debian GNU/Linux 11 | 172.72.6.32 | lhrdb2 | subscriber |
可从多个上游服务器,做数据的聚集和合并;
发布者跟订阅者的关系:一个发布者可以被多个订阅者订阅。多个发布者可以被同一个订阅者订阅。
环境
3个节点都操作:
1 2 3 | create database lhrdb2; \c lhrdb2 create EXTENSION pglogical; |
3个节点都创建测试表; 订阅者创建的表可以无主键;若订阅者有主键,可利用序列自增来解决冲突。(例如:本例是两个发布者,则发布者1可取奇数;发布者二可取偶数)。若无主键;数据不受影响。
1 2 3 4 5 6 7 8 9 10 | -- 6.30 create table test(id int primary key,name text); CREATE SEQUENCE seq_id INCREMENT BY 2 START WITH 1; -- 6.31 create table test(id int primary key,name text); CREATE SEQUENCE seq_id INCREMENT BY 2 START WITH 2; -- 6.32 create table test(id int primary key,name text); |
发布端
在6.30和6.31创建节点,注意修改IP地址:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | -- 1、创建节点:在数据库PG13里创建提供者节点 -- SELECT pglogical.drop_node(node_name := 'provider30'); SELECT pglogical.create_node( node_name := 'provider30', dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb2' ); -- 查看 select * from pglogical.node_interface ; -- 2、将表加入同步 select pglogical.replication_set_add_table( set_name := 'default', relation := 'test',synchronize_data := true); -- 查询 select * from pglogical.tables ; |
订阅端
订阅端的数据来自于2个发布端,所以,需要在相应的数据库下创建1个订阅者节点和2个订阅。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | -- 1、创建节点,在数据库PG12创建订阅者节点 SELECT pglogical.create_node( node_name := 'subnode32', dsn := 'host=172.72.6.32 port=5432 dbname=lhrdb2 user=postgres password=lhr' ); -- 查询 select * from pglogical.node_interface ; -- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程,这里需要2个提供者 SELECT pglogical.create_subscription( subscription_name := 'sub32from30', provider_dsn := 'host=172.72.6.30 port=5432 dbname=lhrdb2 user=postgres password=lhr' ); SELECT pglogical.create_subscription( subscription_name := 'sub32from31', provider_dsn := 'host=172.72.6.31 port=5432 dbname=lhrdb2 user=postgres password=lhr' ); -- 查询 select * from pglogical.node_interface ; select * from pglogical.subscription ; select * from pglogical.show_subscription_status(); -- 创建订阅这个命令执行完成后,会在发布端创建一个复制槽,这个很重要,一个订阅者一个复制槽 select * from pg_replication_slots ; -- 删除: 若要删除,则先删除订阅者,再删除节点 SELECT pglogical.drop_subscription(subscription_name := 'sub32from30'); SELECT pglogical.drop_node(node_name := 'subnode32'); |
测试数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | -- 60.30 insert into test select nextval('seq_id'),'xxt' || generate_series(1,10,2); -- 60.31 insert into test select nextval('seq_id'),'xxt' || generate_series(2,10,2); -- 订阅端查询 lhrdb2=# select * from test; id | name ----+------ 1 | xxt1 3 | xxt3 5 | xxt5 7 | xxt7 9 | xxt9 2 | xxt2 4 | xxt4 6 | xxt6 8 | xxt8 10 | xxt10 (10 rows) |
示例三:云环境RDS中配置pglogical(未成功)
国内的华为云、阿里云和腾讯云的RDS for PG都已经内置了pglogical插件!
华为云支持的插件列表可以参考:https://support.huaweicloud.com/usermanual-rds/rds_09_0045.html
购买2台华为云rds环境:
数据库版本 | 环境 | 内网IP | EIP | 数据库 | 角色 |
---|---|---|---|---|---|
PostgreSQL 12.6 | 华为云RDS | 10.0.0.73 | 119.3.169.211 | lhrdb | 发布者 |
PostgreSQL 12.6 | 华为云RDS | 10.0.0.74 | 139.9.129.179 | lhrdb | 订阅者 |
环境
2个节点都操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | create database lhrdb; \c lhrdb select * from pg_settings where name in ('wal_level','max_worker_processes','max_replication_slots','max_wal_senders','shared_preload_libraries'); lhrdb=> select * from pg_available_extensions where name like 'pglogical%'; name | default_version | installed_version | comment ------------------+-----------------+-------------------+-------------------------------------------------------------------- pglogical | 2.3.3 | | PostgreSQL Logical Replication pglogical_origin | 1.0.0 | | Dummy extension for compatibility when upgrading from Postgres 9.4 (2 rows) lhrdb=> create EXTENSION pglogical; ERROR: pglogical is not in shared_preload_libraries lhrdb=> show shared_preload_libraries; shared_preload_libraries --------------------------------------------------------------- passwordcheck.so, pg_stat_statements, pg_sql_history, pgaudit (1 row) -- 这里由于没有权限修改参数,使用命令和界面都不能修改,必须提工单,由华为的人后台进行修改这个参数才可以!!!! -- 修改完之后继续 lhrdb=> show shared_preload_libraries; shared_preload_libraries -------------------------------------------------------------------------- passwordcheck.so, pg_stat_statements, pg_sql_history, pgaudit, pglogical (1 row) lhrdb=> create EXTENSION pglogical; CREATE EXTENSION lhrdb=> lhrdb=> lhrdb=> lhrdb=> lhrdb=> \dx List of installed extensions Name | Version | Schema | Description -----------+---------+------------+-------------------------------- pglogical | 2.3.3 | pglogical | PostgreSQL Logical Replication plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language (2 rows) |
2个节点都创建测试表:
1 2 3 4 5 6 | -- 6.30 create table test(id int primary key,name text); CREATE SEQUENCE seq_id INCREMENT BY 2 START WITH 1; -- 6.32 create table test(id int primary key,name text); |
发布端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | -- 1、创建节点:在数据库PG13里创建提供者节点 -- SELECT pglogical.drop_node(node_name := 'provider30'); SELECT pglogical.create_node( node_name := 'provider', dsn := 'host=127.0.0.1 port=5432 dbname=lhrdb' ); -- 查看 select * from pglogical.node_interface ; -- 2、将表加入同步 select pglogical.replication_set_add_table( set_name := 'default', relation := 'test',synchronize_data := true); -- 查询 select * from pglogical.tables ; |
订阅端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | -- 1、创建节点,在数据库PG12创建订阅者节点 SELECT pglogical.create_node( node_name := 'subnode', dsn := 'host=127.0.0.1 port=5432 dbname=lhrdb user=root password=lhr@xxt123' ); -- 查询 select * from pglogical.node_interface ; -- 2、订阅提供者节点,该订阅将在后台启动同步和复制过程 SELECT pglogical.create_subscription( subscription_name := 'sub32from30', provider_dsn := 'host=10.0.0.73 port=5432 dbname=lhrdb user=root password=lhr@xxt123' ); -- 查询 select * from pglogical.node_interface ; select * from pglogical.subscription ; select * from pglogical.show_subscription_status(); -- 创建订阅这个命令执行完成后,会在发布端创建一个复制槽,这个很重要,一个订阅者一个复制槽 select * from pg_replication_slots ; |
这里发现华为云rds的源端并没有生成复制槽,所以,数据并不会同步!!
排错
failed: fe_sendauth: no password supplied
在创建订阅端的时候报错:
1 2 3 4 5 6 7 8 | postgres=# SELECT pglogical.create_subscription( postgres(# subscription_name :='subscription1', postgres(# provider_dsn := 'host=172.72.7.20 dbname=postgres user=postgres password=postgres' postgres(# ); ERROR: could not connect to the postgresql server: connection to server at "172.72.7.20", port 5432 failed: fe_sendauth: no password supplied DETAIL: dsn was: host=172.72.7.20 port=5432 dbname=postgres |
解决:
1 2 3 4 5 6 7 8 9 | SELECT pglogical.create_node( node_name := 'provider1', dsn := 'host=172.72.7.10 port=5432 dbname=test user=root password=postgres' ); SELECT pglogical.create_subscription( subscription_name := 'sub1', provider_dsn := 'host=172.72.7.20 port=5432 dbname=postgres user=postgres password=postgres' ); |
密码应该在node和subscription里都提供。
总结
1、经过多次试验,发现创建表的操作往往不能成功!!! 若发布端新增表,那么目标端最好也需要新建表。
2、创建节点和订阅时,一定要注意参数的值,例如:dbname、user和host
3、创建一个发布者节点,会同时在后台启动一个进程,叫“postgres: pglogical manager 16384”
4、创建一个订阅者,会在发布端后台启动一个进程,叫“postgres: walsender postgres 172.72.6.32(50340) idle”;同时会在订阅节点后台启动一个进程,叫“postgres: pglogical apply 16760:810137120”
4、排错请查看发布端和订阅端的数据库告警日志内容。
5、pglogical实现的这些功能,完全可以使用OGG来替代,具体可以参考:https://www.xmmup.com/shiyongogg-for-pgweifuwukuaisushuangxiangtongburdsshujukushuangzhu.html
6、在华为云端rds中,虽然有pglogical插件,但是我自己没有配置成功,创建订阅的时候不能使用公网,使用内网IP后,虽然创建订阅成功了,但是源端也不能生成复制槽,导致数据并不能同步到目标端。其它云没有测试过!!