GBase 8c 学习笔记 010 —— GBase 8c 分布式执行计划
GBase 8c 学习笔记 010 —— GBase 8c 分布式执行计划
SQL执行流程
解析器
- 词法分析: 从查询语句中识别出系统支持的关键字、标识符、操作符、终结符等。
- 语法分析: 根据SQL语言的标准定义语法规则,使用词法分析中产生的词去匹配语法规则,生成对应的抽象语法树。
- 语义分析: 对语法树进行有效性检查,检查语法树中对应的表、列、函数、表达式是否有对应的元数据,将抽象语法树转换为逻辑执行计划。
优化器
- 基于代价的查询优化(Cost Based Optimization,CBO) :对SQL
语句所待选执行路径进行代价估算,从中选择代价最低的执行路径作为最终的执行计划。
基于机器学习的查询优化(AI Based Optimization,ABO) :收集执行计划的特征信息,借助机器学习模型获得经验信息,进而对执行计划进行调优,获得最优的执行计划。
Explain——执行计划
Explain 语法将显示 SQL 语句所引用的表会采用什么样的扫描方式,如:简单的顺序扫描、索引扫描等。如果引用了多个表,执行计划还会显示用到的 JOIN 算法。基本语法与常用参数:
1 2 3 4 5 6 7 8 9 | EXPLAIN [ ( option [, ...] ) ] statement; where option can be: ANALYZE [ boolean ] | ANALYSE [ boolean ] | -- 显示实际运行时间和其他统计数据 VERBOSE [ boolean ] | -- 显示额外的详细信息 COSTS [ boolean ] | -- 包括每个规划节点的估计总成本,以及估计的行数等信息 DETAIL [ boolean ] | -- 打印数据库节点的信息 TIMING [ boolean ] | -- 包括实际的启动时间和花费在输出节点上的时间信息 PLAN [ boolean ] | -- 是否将执行计划存储在plan_table中。 FORMAT { TEXT | XML | JSON | YAML } -- 指定输出格式。 |
分布式执行计划
分布式执行计划类型——LightProxy
LightProxy
- 执行计划只涉及一个DN节点,直接将 sql 语句发送到 dn 执行
- 单个DN即可完成结果的获取,常见于点查、精准查询场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | --- 创建表 postgres=# create table t1 ( postgres(# id1 int, postgres(# id2 int postgres(# )distribute by hash(id1); CREATE TABLE postgres=# postgres=# \d+ t1; Table "public.t1" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- id1 | integer | | plain | | id2 | integer | | plain | | Has OIDs: no Distribute By: HASH(id1) Location Nodes: ALL DATANODES Options: orientation=row, compression=no --- 插入数据 postgres=# insert into t1 select generate_series(1,100),generate_series(1,100); INSERT 0 100 --- 执行explain postgres=# explain analyze select * from t1 where id1 = 1; QUERY PLAN ----------------------------------------------------------------------------------------------- Data Node Scan (cost=0.00..0.00 rows=1000 width=8) (actual time=0.500..0.500 rows=1 loops=1) Node/s: dn1 Total runtime: 0.527 ms (3 rows) |
分布式执行计划类型——FQS
FQS (Fast Query Shipping)
- 计划可以涉及多个DN节点,但在执行过程中,DN节点之间无数据交互;
- FQS 会对 sql 进行适当重写,通过remote query执行;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | --- 创建表t1,分布列为id1 postgres=# drop table if exists t1; DROP TABLE postgres=# create table t1 ( postgres(# id1 int, postgres(# id2 int postgres(# )distribute by hash(id1); CREATE TABLE --- 创建表t2,分布列为id1 drop table if exists t2; postgres=# create table t2 ( postgres(# id1 int, postgres(# id2 int postgres(# )distribute by hash(id1); CREATE TABLE postgres=# \d+ t1; Table "public.t1" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- id1 | integer | | plain | | id2 | integer | | plain | | Has OIDs: no Distribute By: HASH(id1) Location Nodes: ALL DATANODES Options: orientation=row, compression=no postgres=# \d+ t2; Table "public.t2" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- id1 | integer | | plain | | id2 | integer | | plain | | Has OIDs: no Distribute By: HASH(id1) Location Nodes: ALL DATANODES Options: orientation=row, compression=no --- 插入数据 postgres=# insert into t1 select generate_series(1,100),generate_series(1,100); INSERT 0 100 postgres=# insert into t2 select generate_series(1,100),generate_series(1,100); INSERT 0 100 --- 执行explain postgres=# explain analyze select * from t1,t2 where t1.id1=t2.id1; QUERY PLAN -------------------------------------------------------------------------------------------------- Data Node Scan (cost=0.00..0.00 rows=1000 width=16) (actual time=0.893..1.621 rows=100 loops=1) Node/s: All datanodes Total runtime: 1.663 ms (3 rows) |
分布式执行计划类型——Stream
Stream
由 cn 生成执行计划,将执行计划发送到 dn,dn 按照接收到的计划执行相应算子,cn 收集 dn 的执行结果,进行处理后做最后输出;
Stream 算子类型:
gather、redistribute、broadcast
本人提供Oracle(OCP、OCM)、MySQL(OCP)、PostgreSQL(PGCA、PGCE、PGCM)等数据库的培训和考证业务,私聊QQ646634621或微信db_bao,谢谢!gather :cn 将本算子以下的计划发送的其他节点,收集汇总dn的执行结果;
broadcast
:执行broadcast时,dn将查询到的数据广播到所有其他dn节点,在执行过程中,拥有两种角色,consumer、producer,
consumer:接受producer发送的数据;
producer:新启动streamworker线程作为producer,将扫描到的所有数据,广播到所有consumer(本dn和其他dn);
redistribute :过程和broadcast类似,但producer在发送数据根据指定的key选择发送发送到哪个consumer;
当 broadcast 与 redistribute 均能实现查询时,倾向于将小表做广播,大表做重分布。
例1:select连接条件为分布列和非分布列
- 连接条件为t1.id1(分布列)与t2.id2(非分布列)时,将t2表数据按照id2在dn上做重分布。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | postgres=# explain verbose select * from t1,t2 where t1.id1 = t2.id2; QUERY PLAN ------------------------------------------------------------------------------- Streaming(type: GATHER) (cost=3.12..10.69 rows=50 width=16) Output: t1.id1, t1.id2, t2.id1, t2.id2 Spawn on: All datanodes Consumer Nodes: All datanodes -> Hash Join (cost=3.12..10.44 rows=100 distinct=[50, 50] width=16) Output: t1.id1, t1.id2, t2.id1, t2.id2 Hash Cond: (t2.id2 = t1.id1) -> Streaming(type: REDISTRIBUTE) (cost=0.00..6.62 rows=100 width=8) Output: t2.id1, t2.id2 Distribute Key: t2.id2 Spawn on: All datanodes Consumer Nodes: All datanodes -> Seq Scan on public.t2 (cost=0.00..2.50 rows=100 width=8) Output: t2.id1, t2.id2 Distribute Key: t2.id1 -> Hash (cost=2.50..2.50 rows=100 width=8) Output: t1.id1, t1.id2 -> Seq Scan on public.t1 (cost=0.00..2.50 rows=100 width=8) Output: t1.id1, t1.id2 Distribute Key: t1.id1 (20 rows) |
例2:select连接条件为非分布列和非分布列
- 当链接条件均为非分布列时,通过广播和重分布均能实现链接,表规模较小时,倾向于广播,规模较大时,倾向于重分布。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | postgres=# explain verbose select * from t1,t2 where t1.id2 = t2.id2; QUERY PLAN ----------------------------------------------------------------------------- Streaming(type: GATHER) (cost=3.12..14.57 rows=50 width=16) Output: t1.id1, t1.id2, t2.id1, t2.id2 Spawn on: All datanodes Consumer Nodes: All datanodes -> Hash Join (cost=3.12..14.31 rows=100 distinct=[100, 50] width=16) Output: t1.id1, t1.id2, t2.id1, t2.id2 Hash Cond: (t1.id2 = t2.id2) -> Streaming(type: BROADCAST) (cost=0.00..10.37 rows=200 width=8) Output: t1.id1, t1.id2 Spawn on: All datanodes Consumer Nodes: All datanodes -> Seq Scan on public.t1 (cost=0.00..2.50 rows=100 width=8) Output: t1.id1, t1.id2 Distribute Key: t1.id1 -> Hash (cost=2.50..2.50 rows=100 width=8) Output: t2.id1, t2.id2 -> Seq Scan on public.t2 (cost=0.00..2.50 rows=100 width=8) Output: t2.id1, t2.id2 Distribute Key: t2.id1 (19 rows) |
例3:update连接条件为分布列
- update t1 时,连接条件为t1.id1(分布列)与t2.id1(分布列),不会进行重分布或广播。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | postgres=# explain verbose update t1 set id2 = 101 from t2 where t1.id1 = t2.id1; QUERY PLAN ------------------------------------------------------------------------------------ Streaming(type: GATHER) (cost=2.81..6.21 rows=50 width=20) Spawn on: All datanodes Consumer Nodes: All datanodes -> Update on public.t1 (cost=2.81..5.89 rows=100 width=20) -> Hash Join (cost=2.81..5.89 rows=100 distinct=[200, 50] width=20) Output: t1.id1, 101, t1.id1, t1.ctid, t1.xc_node_id, t2.ctid Hash Cond: (t1.id1 = t2.id1) -> Seq Scan on public.t1 (cost=0.00..2.50 rows=100 width=14) Output: t1.id1, t1.ctid, t1.xc_node_id Distribute Key: t1.id1 -> Hash (cost=2.50..2.50 rows=100 width=10) Output: t2.ctid, t2.id1 -> Seq Scan on public.t2 (cost=0.00..2.50 rows=100 width=10) Output: t2.ctid, t2.id1 Distribute Key: t2.id1 (15 rows) |
例4:update连接条件为分布列与非分布列
- update t1时,连接条件为t1.id2(非分布列)与t2.id1(分布列),更新的是t1表,不能广播或重分布t1表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | postgres=# explain verbose update t1 set id2 = 101 from t2 where t1.id2 = t2.id1; QUERY PLAN ------------------------------------------------------------------------------------ Streaming(type: GATHER) (cost=3.12..14.69 rows=50 width=24) Spawn on: All datanodes Consumer Nodes: All datanodes -> Update on public.t1 (cost=3.12..14.31 rows=100 width=24) -> Hash Join (cost=3.12..14.31 rows=100 distinct=[100, 50] width=24) Output: t1.id1, 101, t1.id2, t1.ctid, t1.xc_node_id, t2.ctid Hash Cond: (t2.id1 = t1.id2) -> Streaming(type: BROADCAST) (cost=0.00..10.37 rows=200 width=10) Output: t2.ctid, t2.id1 Spawn on: All datanodes Consumer Nodes: All datanodes -> Seq Scan on public.t2 (cost=0.00..2.50 rows=100 width=10) Output: t2.ctid, t2.id1 Distribute Key: t2.id1 -> Hash (cost=2.50..2.50 rows=100 width=18) Output: t1.id1, t1.id2, t1.ctid, t1.xc_node_id -> Seq Scan on public.t1 (cost=0.00..2.50 rows=100 width=18) Output: t1.id1, t1.id2, t1.ctid, t1.xc_node_id Distribute Key: t1.id1 (19 rows) |
分布式执行执行类型——Remote query
Remote query
- Remote query 计划将查询语句发送到 DN 节点,在 CN 收集执行结果;
- 不支持 Stream 计划时,通过 Remote query 将数据收集到 CN,在 CN 完成相关操作;
- update 带有 returning 语句,并存在两表连接,不支持 stream 计划,CN 通过 remote query 从 DN获取两表数据,在 CN 完成连接操作,根据连接结果构造最终 update 语句,通过remote query下发DN。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | postgres=# explain verbose update t1 set id2 = 101 from t2 where t1.id1 = t2.id2 returning t1.id1; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------- Update on public.t1 (cost=0.62..1.81 rows=100 width=20) Output: t1.id1 Node/s: All datanodes Node expr: t1.id1 Remote query: UPDATE ONLY public.t1 SET id1 = $1, id2 = $2 WHERE t1.ctid = $3 AND t1.xc_node_id = $4 RETURNING t1.id1 -> Hash Join (cost=0.62..1.81 rows=100 distinct=[200, 100] width=20) Output: t1.id1, 101, t1.id1, t1.ctid, t1.xc_node_id, t2.ctid Hash Cond: (t1.id1 = t2.id2) -> Data Node Scan on t1 "_REMOTE_TABLE_QUERY_" (cost=0.00..0.00 rows=1000 width=14) Output: t1.id1, t1.ctid, t1.xc_node_id Node/s: All datanodes Remote query: SELECT id1, ctid, xc_node_id FROM ONLY public.t1 WHERE ((true)) -> Seq Scan on public.t1 (cost=0.00..1.42 rows=42 width=14) Output: id1, ctid, xc_node_id -> Hash (cost=0.00..0.00 rows=1000 width=10) Output: t2.ctid, t2.id2 -> Data Node Scan on t2 "_REMOTE_TABLE_QUERY_" (cost=0.00..0.00 rows=1000 width=10) Output: t2.ctid, t2.id2 Node/s: All datanodes Remote query: SELECT ctid, id2 FROM ONLY public.t2 WHERE ((true)) -> Seq Scan on public.t2 (cost=0.00..1.42 rows=42 width=10) Output: ctid, id2 (22 rows) |