PG逻辑复制插件之pgl_ddl_deploy支持DDL复制
Tags: DDL复制PGpgl_ddl_deploy插件逻辑复制
- PG内置的逻辑复制:https://www.xmmup.com/pgzhongdeluojifuzhilogical-replication.html
- PG逻辑复制插件之pglogical使用说明:https://www.xmmup.com/pgluojifuzhichajianzhipglogicalshiyongshuoming.html
简介
详情:https://github.com/lhrbest/pgl_ddl_deploy
Transparent DDL replication for Postgres 9.5+ for both pglogical and native logical replication.
PostgreSQL 社区官方版本不支持DDL的逻辑复制,仅支持DML【INSERT、UPDATE、DELETE、TRUNCATE】,且要求表有主键,否则UPDATE和DELETE无法复制【注意:逻辑复制要求wal_level为logical】。
目前有第三方的插件可以做到DDL复制,BDR【收费】、pglogical【开源,比较复杂】、pgl_ddl_deploy。
pgl_ddl_deploy该插件可以实现PG逻辑复制中的DDL复制。该插件具有以下特点:
任何DDL SQL语句都可以直接传播给订阅者
表可以在创建时自动添加到复制中
支持过滤。可以选择性仅复制某些schema
可以选择以锁定安全的方式在订阅服务器上进行部署
ALTER TABLE语句可以由子命令标签过滤
当前最新版本是2.10版,从2.0版本开始,它支持使用本机逻辑复制进行DDL复制(以前依赖于pglogical插件)。详细信息请查阅:https://github.com/enova/pgl_ddl_deploy
本人提供Oracle(OCP、OCM)、MySQL(OCP)、PostgreSQL(PGCA、PGCE、PGCM)等数据库的培训和考证业务,私聊QQ646634621或微信db_bao,谢谢!在配置ddl复制的过程中,需要复制queue表,该表记录了一个函数,用来执行ddl语句。
安装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | -- CentOS git clone https://github.com/enova/pgl_ddl_deploy.git wget https://github.com/enova/pgl_ddl_deploy/archive/refs/tags/v2.1.0.tar.gz tar -zxvf v2.1.0.tar.gz cd pgl_ddl_deploy-2.1.0 export PGHOME=/usr/lib/postgresql/13 export PATH=$PGHOME/bin:$PATH USE_PGXS=1 make USE_PGXS=1 make install -- debian 系统 apt-get install postgresql-13-pgl-ddl-deploy create extension pgl_ddl_deploy; |
内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | lhrdb3=# \dx+ pgl_ddl_deploy Objects in extension "pgl_ddl_deploy" Object description ------------------------------------------------------------------------------------------------------------------------------------------------------------------ event trigger auto_rep_ddl_create_1_pub1 event trigger auto_rep_ddl_drop_1_pub1 event trigger auto_rep_ddl_unsupp_1_pub1 function pgl_ddl_deploy.add_ext_object(text,text) function pgl_ddl_deploy.add_role(oid) function pgl_ddl_deploy.add_table_to_replication(pgl_ddl_deploy.driver,name,regclass,boolean) function pgl_ddl_deploy.auto_rep_ddl_create_1_pub1() function pgl_ddl_deploy.auto_rep_ddl_drop_1_pub1() function pgl_ddl_deploy.auto_rep_ddl_unsupp_1_pub1() function pgl_ddl_deploy.blacklisted_tags() function pgl_ddl_deploy.common_exclude_alter_table_subcommands() function pgl_ddl_deploy.current_query() function pgl_ddl_deploy.deploy(integer) function pgl_ddl_deploy.deployment_check_count(integer,text,text,pgl_ddl_deploy.driver) function pgl_ddl_deploy.deployment_check(integer) function pgl_ddl_deploy.deployment_check(text) function pgl_ddl_deploy.deployment_check_wrapper(integer,text) function pgl_ddl_deploy.deploy(text) function pgl_ddl_deploy.disable(integer) function pgl_ddl_deploy.disable(text) function pgl_ddl_deploy.drop_ext_object(text,text) function pgl_ddl_deploy.enable(integer) function pgl_ddl_deploy.enable(text) function pgl_ddl_deploy.exclude_regex() function pgl_ddl_deploy.execute_queued_ddl() function pgl_ddl_deploy.fail_queued_attempt(integer,text) function pgl_ddl_deploy.get_altertable_subcmdtypes(pg_ddl_command) function pgl_ddl_deploy.get_command_tag(pg_ddl_command) function pgl_ddl_deploy.get_command_type(pg_ddl_command) function pgl_ddl_deploy.is_subscriber(pgl_ddl_deploy.driver,text[],name) function pgl_ddl_deploy.kill_blockers(pgl_ddl_deploy.signals,name,name) function pgl_ddl_deploy.lock_safe_executor(text) function pgl_ddl_deploy.log_unhandled(integer,text,integer,text,text,text,bigint) function pgl_ddl_deploy.notify_subscription_refresh(name,boolean) function pgl_ddl_deploy.override() function pgl_ddl_deploy.provider_node_name(pgl_ddl_deploy.driver) function pgl_ddl_deploy.queue_ddl_message_type() function pgl_ddl_deploy.raise_message(text,text) function pgl_ddl_deploy.replicate_ddl_command(text,text[]) function pgl_ddl_deploy.rep_set_table_wrapper() function pgl_ddl_deploy.rep_set_wrapper() function pgl_ddl_deploy.resolve_exception(integer,text) function pgl_ddl_deploy.resolve_unhandled(integer,text) function pgl_ddl_deploy.retry_all_subscriber_logs() function pgl_ddl_deploy.retry_subscriber_log(integer) function pgl_ddl_deploy.schema_execute(integer,text) function pgl_ddl_deploy.schema_execute(text,text) function pgl_ddl_deploy.set_origin_subscriber_log_id() function pgl_ddl_deploy.set_tag_defaults() function pgl_ddl_deploy.sql_command_tags(text) function pgl_ddl_deploy.standard_create_tags() function pgl_ddl_deploy.standard_drop_tags() function pgl_ddl_deploy.standard_repset_only_tags() function pgl_ddl_deploy.subscriber_command(name,text[],name,name,text,text,integer,integer,boolean,pgl_ddl_deploy.signals,integer,pgl_ddl_deploy.driver,boolean) function pgl_ddl_deploy.toggle_ext_object(text,text,text) function pgl_ddl_deploy.undeploy(integer) function pgl_ddl_deploy.undeploy(text) function pgl_ddl_deploy.unique_tags() function pgl_ddl_deploy.unsupported_tags() sequence pgl_ddl_deploy.commands_id_seq sequence pgl_ddl_deploy.events_id_seq sequence pgl_ddl_deploy.exceptions_id_seq sequence pgl_ddl_deploy.killed_blockers_id_seq sequence pgl_ddl_deploy.set_configs_id_seq sequence pgl_ddl_deploy.subscriber_logs_id_seq sequence pgl_ddl_deploy.unhandled_id_seq table pgl_ddl_deploy.commands table pgl_ddl_deploy.events table pgl_ddl_deploy.exceptions table pgl_ddl_deploy.killed_blockers table pgl_ddl_deploy.queue table pgl_ddl_deploy.set_configs table pgl_ddl_deploy.subscriber_logs table pgl_ddl_deploy.unhandled type pgl_ddl_deploy.driver type pgl_ddl_deploy.signals view pgl_ddl_deploy.event_trigger_schema (77 rows) |
数据字典
1 2 3 4 5 6 7 8 | select * from pgl_ddl_deploy.commands; select * from pgl_ddl_deploy.events; select * from pgl_ddl_deploy.exceptions; select * from pgl_ddl_deploy.killed_blockers; select * from pgl_ddl_deploy.queue; select * from pgl_ddl_deploy.set_configs; select * from pgl_ddl_deploy.subscriber_logs; select * from pgl_ddl_deploy.unhandled; |
示例:内部逻辑复制
相关环境配置请参考:https://www.xmmup.com/pgluojifuzhichajianzhipglogicalshiyongshuoming.html
环境配置
2个节点都需要配置:
1 2 3 4 5 6 7 8 | create database lhrdb3; \c lhrdb3 create table t1(id int primary key, name text, reg_time timestamp); insert into t1 select generate_series(1,10000),'xxt',now(); select count(*) from t1; create extension pgl_ddl_deploy; |
配置发布订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | -- 发布端 CREATE PUBLICATION pub1 FOR ALL TABLES; select * from pg_publication; -- 订阅端 create subscription sub1 connection 'host=172.72.6.30 port=5432 dbname=lhrdb3 user=postgres password=lhr' publication pub1; select * from pg_subscription; -- 2. 订阅节点创建同步表并手动执行刷新命令 create table t1(id int primary key, name text, reg_time timestamp); ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION ; -- 发布端查询复制槽 select * from pg_replication_slots; |
过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | lhrdb3=# CREATE PUBLICATION pub1 FOR all TABLES; CREATE PUBLICATION lhrdb3=# select * from pg_publication; oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot -------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------ 17202 | pub1 | 10 | t | t | t | t | t | f (1 row) lhrdb3=# create subscription sub1 connection lhrdb3-# 'host=172.72.6.30 port=5432 dbname=lhrdb3 user=postgres password=lhr' publication pub1; NOTICE: created replication slot "sub1" on publisher CREATE SUBSCRIPTION lhrdb3=# select * from pg_subscription; oid | subdbid | subname | subowner | subenabled | subconninfo | subslotname | subsynccommit | subpublications -------+---------+---------+----------+------------+---------------------------------------------------------------------+-------------+---------------+----------------- 17308 | 16792 | sub1 | 10 | t | host=172.72.6.30 port=5432 dbname=lhrdb3 user=postgres password=lhr | sub1 | off | {pub1} (1 row) lhrdb3=# select count(*) from t1; count ------- 10000 (1 row) lhrdb3=# select * from pg_replication_slots; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size -----------------------------------+------------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------------- sub1 | pgoutput | logical | 16939 | lhrdb3 | f | t | 19678 | | 537 | 0/25CD268 | 0/25CD2A0 | reserved | |
数据已经同步!!!
配置DDL同步
1.在发布端配置
发布端我们需要进行配置,配置方法就是往pgl_ddl_deploy表中插入记录。
1 2 | INSERT INTO pgl_ddl_deploy.set_configs (set_name, include_schema_regex, driver) VALUES ('pub1', '.*', 'native'::pgl_ddl_deploy.driver); |
set_name
:发布端的名字,这个一定要和你创建的发布名字一致。
include_schema_regex
:提供正则表达式,用来匹配需要同步的schema,它可以将新表自动添加到复制当中。
driver
: 选择原生(native)或者是pgologic插件实现。
因为是demo配置,配置较为简单,详细复杂的配置选项参考:https://github.com/enova/pgl_ddl_deploy
2.开启DDL复制
配置完成后使用pgl_ddl_deploy.deploy方法启动ddl复制。
1 2 3 4 5 6 7 8 9 10 11 12 | lhrdb3=# SELECT pgl_ddl_deploy.deploy('pub1') from pgl_ddl_deploy.set_configs; NOTICE: table "tmp_objs" does not exist, skipping NOTICE: event trigger "auto_rep_ddl_create_1_testpub" does not exist, skipping NOTICE: event trigger "auto_rep_ddl_drop_1_testpub" does not exist, skipping NOTICE: event trigger "auto_rep_ddl_unsupp_1_testpub" does not exist, skipping NOTICE: function pgl_ddl_deploy.auto_rep_ddl_create_1_testpub() does not exist, skipping NOTICE: function pgl_ddl_deploy.auto_rep_ddl_drop_1_testpub() does not exist, skipping NOTICE: function pgl_ddl_deploy.auto_rep_ddl_unsupp_1_testpub() does not exist, skipping deploy -------- t (1 row) |
这里显示为t,代表启动成功。
3.添加权限
1 2 | -- 发布侧和订阅侧 SELECT pgl_ddl_deploy.add_role(oid) FROM pg_roles WHERE rolname IN ('postgres'); |
测试DDL复制
启动成功之后,我们来做个小测试,t1表来增加一列。
发布端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | lhrdb3=# alter table t1 add column a1 int; ALTER TABLE lhrdb3=# \d t1 Table "public.t1" Column | Type | Collation | Nullable | Default ----------+-----------------------------+-----------+----------+--------- id | integer | | not null | name | text | | | reg_time | timestamp without time zone | | | a1 | integer | | | Indexes: "t1_pkey" PRIMARY KEY, btree (id) Publications: "pub1" |
订阅端:
1 2 3 4 5 6 7 8 9 10 | lhrdb3=# \d t1 Table "public.t1" Column | Type | Collation | Nullable | Default ----------+-----------------------------+-----------+----------+--------- id | integer | | not null | name | text | | | reg_time | timestamp without time zone | | | a1 | integer | | | Indexes: "t1_pkey" PRIMARY KEY, btree (id) |
查询数据字典
1 2 3 4 5 6 7 8 | select * from pgl_ddl_deploy.commands; select * from pgl_ddl_deploy.events; select * from pgl_ddl_deploy.exceptions; select * from pgl_ddl_deploy.killed_blockers; select * from pgl_ddl_deploy.queue; select * from pgl_ddl_deploy.set_configs; select * from pgl_ddl_deploy.subscriber_logs; select * from pgl_ddl_deploy.unhandled; |
过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | lhrdb3=# select * from pgl_ddl_deploy.commands; id | set_name | pid | txid | classid | objid | objsubid | command_tag | object_type | schema_name | object_identity | in_extension | set_config_id ----+----------+-------+------+---------+-------+----------+-------------+-------------+-------------+-----------------+--------------+--------------- 1 | pub1 | 19646 | 546 | 1259 | 17194 | 0 | ALTER TABLE | table | public | public.t1 | f | 1 (1 row) lhrdb3=# select * from pgl_ddl_deploy.events; id | set_name | pid | executed_at | ddl_sql_raw | ddl_sql_sent | txid | set_config_id ----+----------+-------+-------------------------------+-----------------------------------+-----------------------------------+------+--------------- 1 | pub1 | 19646 | 2022-01-17 10:37:25.189135+08 | alter table t1 add column a1 int; | alter table t1 add column a1 int; | 546 | 1 (1 row) lhrdb3=# select * from pgl_ddl_deploy.exceptions; id | set_name | pid | executed_at | ddl_sql | err_msg | err_state | set_config_id | resolved | resolved_notes ----+----------+-----+-------------+---------+---------+-----------+---------------+----------+---------------- (0 rows) lhrdb3=# select * from pgl_ddl_deploy.killed_blockers; id | signal | successful | pid | executed_at | usename | client_addr | xact_start | state_change | state | query | reported | reported_at ----+--------+------------+-----+-------------+---------+-------------+------------+--------------+-------+-------+----------+------------- (0 rows) lhrdb3=# select * from pgl_ddl_deploy.queue; queued_at | role | pubnames | message_type | message -------------------------------+----------+----------+--------------+-------------------------------------------------------------------------------------------------------- 2022-01-17 10:37:25.198941+08 | postgres | {pub1} | Q | + | | | | SELECT pgl_ddl_deploy.subscriber_command + | | | | ( + | | | | p_provider_name := NULL, + | | | | p_set_name := ARRAY['pub1'], + | | | | p_nspname := 'public', + | | | | p_relname := 't1', + | | | | p_ddl_sql_sent := $pgl_ddl_deploy_sql$alter table t1 add column a1 int;$pgl_ddl_deploy_sql$,+ | | | | p_full_ddl := $pgl_ddl_deploy_sql$ + | | | | --Be sure to use provider's search_path for SQL environment consistency + | | | | SET SEARCH_PATH TO "$user", public; + | | | | + | | | | alter table t1 add column a1 int; + | | | | ; + | | | | $pgl_ddl_deploy_sql$, + | | | | p_pid := 19646, + | | | | p_set_config_id := 1, + | | | | p_queue_subscriber_failures := false, + | | | | p_signal_blocking_subscriber_sessions := NULL, + | | | | p_lock_timeout := 3000, + | | | | p_driver := 'native' + | | | | ); + | | | | (1 row) ' lhrdb3=# select * from pgl_ddl_deploy.set_configs; set_name | include_schema_regex | lock_safe_deployment | allow_multi_statements | id | include_only_repset_tables | create_tags | drop_tags | blacklisted_tags | queue_subscriber_failures | exclude_alter_table_subcommands | ddl_only_replication | include_everything | signal_blocking_subscriber_sessions | subscriber_lock_timeout | driverpub1 | .* | f | t | 1 | f | {"ALTER TABLE","CREATE SEQUENCE","ALTER SEQUENCE","CREATE SCHEMA","CREATE TABLE","CREATE FUNCTION","ALTER FUNCTION","CREATE TYPE","ALTER TYPE","CREATE VIEW","ALTER VIEW",COMMENT,"CREATE RULE","CREATE TRIGGER","ALTER TRIGGER"} | {"DROP SCHEMA","DROP TABLE","DROP FUNCTION","DROP TYPE","DROP VIEW","DROP SEQUENCE"} | {INSERT,UPDATE,DELETE,TRUNCATE,ROLLBACK,"CREATE EXTENSION","ALTER EXTENSION","DROP EXTENSION"} | f | | f | f | | | native (1 row) lhrdb3=# select * from pgl_ddl_deploy.subscriber_logs; id | set_name | provider_pid | subscriber_pid | executed_at | ddl_sql | full_ddl_sql | origin_subscriber_log_id | next_subscriber_log_id | provider_node_name | provider_set_config_id | executed_as_role | retrying | succeeded | error_message ----+----------+--------------+----------------+-------------+---------+--------------+--------------------------+------------------------+--------------------+------------------------+------------------+----------+-----------+--------------- (0 rows) lhrdb3=# select * from pgl_ddl_deploy.unhandled; id | set_name | pid | executed_at | ddl_sql_raw | command_tag | reason | txid | set_config_id | resolved | resolved_notes ----+----------+-----+-------------+-------------+-------------+--------+------+---------------+----------+---------------- (0 rows) |
总结
从个人使用感觉上,感觉插件多多少少还是有点问题的。
所以,生产中,建议只同步DML语句,对于DDL语句,为了稳定,还是建议手动执行。
附录:官方文档