--- tags: Greenplum, Showcase --- # AMA Share Input Scan ## A brief intro on Share Input Scan ```sql -- setup drop table if exists foo; drop table if exists bar; drop table if exists jazz; CREATE TABLE foo (a int, b int); CREATE TABLE bar (c int, d int); CREATE TABLE jazz(e int, f int); INSERT INTO foo values (1, 2); INSERT INTO bar SELECT i, i from generate_series(1, 100)i; INSERT INTO jazz VALUES (2, 2), (3, 3); ANALYZE foo; ANALYZE bar; ANALYZE jazz; drop table if exists t1; drop table if exists t2; create table t1 (a int, b int); create table t2 (a int); ``` ### Example 1: a dummy example #### Query 1: ```sql SET optimizer = off; EXPLAIN (COSTS OFF) WITH cte AS MATERIALIZED (SELECT * FROM foo) SELECT * FROM cte; RESET optimizer; ``` #### Plan 1: ``` QUERY PLAN ------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> Shared Scan (share slice:id 1:0) -> Seq Scan on foo Optimizer: Postgres query optimizer (4 rows) ``` ### Example 2: producer and consumer on same slice #### Query 2a: ```sql EXPLAIN (COSTS OFF) WITH cte AS (SELECT * FROM foo) SELECT * FROM cte UNION ALL SELECT * FROM cte; ``` #### Plan 2a: ``` QUERY PLAN ---------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) -> Sequence -> Shared Scan (share slice:id 1:0) -> Seq Scan on foo -> Append -> Shared Scan (share slice:id 1:0) -> Shared Scan (share slice:id 1:0) Optimizer: Pivotal Optimizer (GPORCA) (8 rows) ``` #### Query 2b: ```sql EXPLAIN (COSTS OFF) WITH cte as (SELECT * FROM t1 WHERE random() < 0.1 LIMIT 10) SELECT a, 1 , 1 from cte; ``` #### Plan 2b: ``` QUERY PLAN ------------------------------------------------------------------------ Sequence -> Shared Scan (share slice:id 0:0) -> Limit -> Gather Motion 3:1 (slice1; segments: 3) -> Seq Scan on t1 Filter: (random() < '0.1'::double precision) -> Result -> Shared Scan (share slice:id 0:0) Optimizer: Pivotal Optimizer (GPORCA) (9 rows) ``` ### Example 3: producer and consumer cross slice #### Query 3: ```sql EXPLAIN (COSTS OFF) WITH cte as (SELECT * FROM t1 WHERE random() < 0.1 LIMIT 10) SELECT a, 1 , 1 from cte join t2 USING (a); ``` #### Plan 3: ``` QUERY PLAN ------------------------------------------------------------------------ Sequence -> Shared Scan (share slice:id 0:0) -> Limit -> Gather Motion 3:1 (slice1; segments: 3) -> Seq Scan on t1 Filter: (random() < '0.1'::double precision) -> Result -> Gather Motion 3:1 (slice2; segments: 3) -> Hash Join Hash Cond: (share0_ref2.a = t2.a) -> Redistribute Motion 1:3 (slice3) Hash Key: share0_ref2.a -> Result -> Shared Scan (share slice:id 3:0) -> Hash -> Seq Scan on t2 Optimizer: Pivotal Optimizer (GPORCA) (17 rows) ``` ### Producer/Consumer Synchronization With GPORCA, CTE producer and consumers are always below a Sequence node. So on the same segment of the same slice, the producer always gets executed first. #### Intra-slice - Producer fetch tuples and put them into a in-memory tuplestore, and only spill if not enough memory. No Synchronization is required since producer and consumer are on same process. #### Cross-slice - Producer fetch tuples and materialize them into a **shared tuplestore**. (See `init_tuplestore_state()`) - The shared tuple store are backed by a set of temporary files (See `SharedFileSet`) that can be shared by multiple backends, if possible, they are also stored across multiple temp tablespaces. - Producer and consumers communicate via condition variable on 7X (See `shareinput_Xslice_state` on 7X) and fifo file (similar to pipe) on 6X (See `ShareInput_Lk_Context` on 6X) ## A cusotmer reported issue ### Repro ```sql -- setup tables drop table if exists t1; create table t1 (a int, b int); drop table if exists t2; create table t2 (a int); -- create a function which returns one more column than expected. CREATE OR REPLACE FUNCTION ko() RETURNS TABLE (field1 int, field2 int) LANGUAGE 'plpgsql' VOLATILE STRICT AS $$ DECLARE v_qry text; BEGIN v_qry := 'with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a)'; return query execute v_qry; END $$; select ko(); ``` Crash: ``` postgres=# select ko(); ERROR: structure of query does not match function result type DETAIL: Number of returned columns (3) does not match expected column count (2). CONTEXT: PL/pgSQL function ko() line 6 at RETURN QUERY FATAL: Unexpected internal error (assert.c:48) DETAIL: FailedAssertion("!(node->ps.state->interconnect_context)", File: "nodeMotion.c", Line: 317) server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. The connection to the server was lost. Attempting reset: Failed. ``` Stack trace on QD: ``` #0 execMotionUnsortedReceiver (node=0x96d5060) at nodeMotion.c:317 #1 0x0000000000b0421c in ExecMotion (pstate=0x96d5060) at nodeMotion.c:152 #2 0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d5060) at execProcnode.c:599 #3 0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d5060) at execProcnode.c:560 #4 0x0000000000ad6312 in ExecProcNode (node=0x96d5060) at ../../../src/include/executor/executor.h:268 #5 0x0000000000ad5dd2 in ExecLimit_guts (pstate=0x96d4d50) at nodeLimit.c:98 #6 0x0000000000ad58f5 in ExecLimit (node=0x96d4d50) at nodeLimit.c:246 #7 0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d4d50) at execProcnode.c:599 #8 0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d4d50) at execProcnode.c:560 #9 0x0000000000b07512 in ExecProcNode (node=0x96d4d50) at ../../../src/include/executor/executor.h:268 #10 0x0000000000b06987 in init_tuplestore_state (node=0x96d4b38) at nodeShareInputScan.c:288 #11 0x0000000000b070ca in ExecSquelchShareInputScan (node=0x96d4b38) at nodeShareInputScan.c:633 #12 0x0000000000a7a138 in ExecSquelchNode (node=0x96d4b38) at execAmi.c:804 #13 0x0000000000b0977e in ExecSquelchSequence (node=0x96d49f8) at nodeSequence.c:166 #14 0x0000000000a7a048 in ExecSquelchNode (node=0x96d49f8) at execAmi.c:712 #15 0x0000000000aac890 in mppExecutorFinishup (queryDesc=0xaa4b2d0) at execUtils.c:1667 #16 0x0000000000a93dda in standard_ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1124 #17 0x0000000000a93b87 in ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1062 #18 0x00000000009f1424 in PortalCleanup (portal=0x959e8f8) at portalcmds.c:352 #19 0x0000000000ff637b in AtAbort_Portals () at portalmem.c:862 #20 0x000000000080e9e2 in AbortTransaction () at xact.c:3477 #21 0x0000000000810179 in AbortCurrentTransaction () at xact.c:4157 #22 0x0000000000d9be83 in PostgresMain (argc=1, argv=0x94c5a00, dbname=0x94c5878 "postgres", username=0x94c5858 "pivotal") at postgres.c:4984 #23 0x0000000000cb917c in BackendRun (port=0x94bee90) at postmaster.c:4926 #24 0x0000000000cb84c2 in BackendStartup (port=0x94bee90) at postmaster.c:4611 #25 0x0000000000cb7159 in ServerLoop () at postmaster.c:1963 #26 0x0000000000cb45db in PostmasterMain (argc=7, argv=0x9494150) at postmaster.c:1589 #27 0x0000000000b49113 in main (argc=7, argv=0x9494150) at main.c:240 #28 0x00007f3007f75565 in __libc_start_main (main=0xb48d90 <main>, argc=7, argv=0x7ffc2a889f78, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7ffc2a889f68) at ../csu/libc-start.c:332 #29 0x00000000006e753e in _start () ``` ### RCA `ExecMotionUnsortedReceiver()` gets called after interconnect is torn down. See `mppExecFinishup()`. Plan: ``` postgres=# explain (costs off) with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a); QUERY PLAN ------------------------------------------------------------------------ Sequence -> Shared Scan (share slice:id 0:0) -> Limit -> Gather Motion 3:1 (slice1; segments: 3) -> Seq Scan on t1 Filter: (random() < '0.1'::double precision) -> Result -> Gather Motion 3:1 (slice2; segments: 3) -> Hash Join Hash Cond: (share0_ref2.a = t2.a) -> Redistribute Motion 1:3 (slice3) Hash Key: share0_ref2.a -> Result -> Shared Scan (share slice:id 3:0) -> Hash -> Seq Scan on t2 Optimizer: Pivotal Optimizer (GPORCA) (17 rows) ``` Notice we there is a Gather Motion below Shared Scan. ### Solutions 1. First call ExecSquelchNode() on QD then tear down interconnect => dead lock 2. Add special handling in ExecMotionXXX for `estate->interconnect_context == NULL` => This is our intern's fix. We went with this 3. Don't call `ExecProcNode()` during `ExecSquelchShareInputScan()` => leads to wrong result 4. Don't call `ExecProcNode()` during `ExecSquelchShareInputScan()` if we are in the process of aborting transaction. => seems hacky 5. Don't call `ExecSquelchNode()` to cleanup resources in mppExecutorFinishup() => needs more thoughts Current fix: 7X: https://github.com/greenplum-db/gpdb/pull/12550 6X: https://github.com/greenplum-db/gpdb/pull/12614 ## What's special about Squelching Share Input Scan? Share Input Scan is the only executor node that would call `ExecProcNode()` as part of its `ExecSqulechNode()` operation (aka `ExecSquelchShareInputScan()`). Why? See `ExecSquelchShareInputScan()` and commit [9fbd2da](https://github.com/greenplum-db/gpdb/commit/9fbd2da5635eecac6975e7349209373fc9177ef8) #### Example query ```sql= DROP TABLE IF EXISTS foo; DROP TABLE IF EXISTS bar; DROP TABLE IF EXISTS jazz; CREATE TABLE foo (a int, b int); CREATE TABLE bar (c int, d int); CREATE TABLE jazz(e int, f int); INSERT INTO foo values (1, 2); INSERT INTO bar SELECT i, i from generate_series(1, 100)i; INSERT INTO jazz VALUES (2, 2), (3, 3); ANALYZE foo; ANALYZE bar; ANALYZE jazz; SET optimizer_join_order = 'query'; SELECT * FROM ( WITH cte AS (SELECT * FROM foo) SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte) AS X JOIN bar ON b = c ) AS XY JOIN jazz on c = e AND b = f; RESET optimizer_join_order; ``` #### Data distribution: ``` postgres=# select gp_segment_id, * from foo; gp_segment_id | a | b ---------------+---+--- 1 | 1 | 2 (1 row) -- bar has tuples on every segment. postgres=# select gp_segment_id, * from jazz; gp_segment_id | e | f ---------------+---+--- 0 | 2 | 2 0 | 3 | 3 (2 rows) ``` #### Plan: ``` QUERY PLAN ------------------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> Hash Join Hash Cond: (bar.c = jazz.e) -> Sequence -> Shared Scan (share slice:id 1:0) -> Seq Scan on foo -> Hash Join Hash Cond: (share0_ref2.b = bar.c) -> Redistribute Motion 3:3 (slice2; segments: 3) Hash Key: share0_ref2.b -> Append -> Shared Scan (share slice:id 2:0) -> Shared Scan (share slice:id 2:0) -> Hash -> Seq Scan on bar -> Hash -> Seq Scan on jazz Filter: (e = f) Optimizer: Pivotal Optimizer (GPORCA) (19 rows) ``` Without this commit, this query hangs on 6X, but does not hang on 7X. This query does not hang on 7X because by the time the producer slice gets to `ExecSquelchShareInputScan()`, `local_state->ready` is alreay set in a prevous call to `init_tuplestore_state()` via `ExecShareInputScan()` A case that leads to wrong result if w/o commit [9fbd2da](https://github.com/greenplum-db/gpdb/commit/9fbd2da5635eecac6975e7349209373fc9177ef8) (now added to `shared_scan` test) https://github.com/greenplum-db/gpdb/pull/12550#discussion_r711195354 ## Just in case, a recap on slices and interconnect Greenplum supports three interconnect protocols: TCP, UDP and Proxy(if `enable_ic_proxy` is configured). UDP by default. Can be set by GUC `gp_interconnect_type`. Relavent functions: On QD: ``` InitSliceTable() -- How slices are setup, -- especially setting up parent-child relations. CdbDispatchPlan() -> cdbdisp_dispatchX() -> AssignGangs() -- Finishes constructing gangs -> AssignWriterGangFirst() -> InventorySliceTree() -> fillSliceVector() - Decides dispatch order -> cdbdisp_dispatchToGang() -- Do dispatch sql command over libpq -- connections (established during cdb_setup()) ``` On QD and all QE segments: #### SetupInterconnect() Called by `standard_ExecutorStart()` and `ExecSetParamPlan()`. The former is the usual entry point of ExectutorStart (on both QD and QE), the latter is for Planner's InitPlan/SubPlan only. The main purpose of this function is for the QD and each QE process to setup TCP/UDP connection(s) between itself and its sender(child) processes (correspond to the receiving motion on this slice) and its receiver(parent) process (correspond to the sending motion on this slice). #### createChunkTransportState() This function creates a `ChunkTransportStateEntry`, which stores all information of a given motion node. This is a good function to break on to observe how each connection is setup on a QD or QE process. Since each process would only be working on one slice, this founction should be called at most N + 1 (N sender/child slices and 1 receiver/parent slice) times on each process. Question: Can a slice have multiple prarent/receiving slices? Note: `ChunkTransportStateEntry` stores information for a givin Motion, each Motion could have multiple `MotionConn`s. For example, in a 3 segment cluster, the `numConns` of a Broadcast Motion would be 3. The `conns` array is stored in `ChunkTransportStateEntry`. Toy query: ```sql create table foo (a int, b int); create table bar (c int, d int); select * from foo join bar on b = c; ``` break on `createChunkTransportState()` on the QD process and an arbitary QE process and observe `sendSlice` and `recvSlice`. ``` postgres=# explain (costs off) select * from foo join bar on b = c; QUERY PLAN ------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> Hash Join Hash Cond: (foo.b = bar.c) -> Redistribute Motion 3:3 (slice2; segments: 3) Hash Key: foo.b -> Seq Scan on foo -> Hash -> Seq Scan on bar Optimizer: Pivotal Optimizer (GPORCA) (9 rows) ``` #### TeardownInterconnect() The counterpart of `SetupInterconnect()`. Called at `mppExecutorFinishup()`, `ExecSetParamPlan()`, `mappExecutorCleanup()` and other cleanup handling places. ```c /* The TeardownInterconnect() function should be called at the end of executing * a DML statement to close down all socket resources that were setup during * SetupInterconnect(). * * NOTE: it is important that TeardownInterconnect() happens * regardless of the outcome of the statement. i.e. gets called * even if an ERROR occurs during the statement. For abnormal * statement termination we can force an end-of-stream notification. * */ ``` #### removeChunkTransportState() The counterpart of `createChunkTransportState()` ## Thanks @Zhenghua Lyu @Shreedhar Hardikar @Adam Lee @Ekta Khanna Find us on #gp-qpa