--- tags: Greenplum, Tech Discussion --- # Squelching Share Input Scan with Motion PR: https://github.com/greenplum-db/gpdb/pull/12550 ### Repro ```sql -- setup tables drop table if exists t1; create table t1 (a int, b int); drop table if exists t2; create table t2 (a int); -- create a function which returns one more column than expected. CREATE OR REPLACE FUNCTION ko() RETURNS TABLE (field1 int, field2 int) LANGUAGE 'plpgsql' VOLATILE STRICT AS $$ DECLARE v_qry text; BEGIN v_qry := 'with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a)'; return query execute v_qry; END $$; select ko(); ``` Crash: ``` postgres=# select ko(); ERROR: structure of query does not match function result type DETAIL: Number of returned columns (3) does not match expected column count (2). CONTEXT: PL/pgSQL function ko() line 6 at RETURN QUERY FATAL: Unexpected internal error (assert.c:48) DETAIL: FailedAssertion("!(node->ps.state->interconnect_context)", File: "nodeMotion.c", Line: 317) server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. The connection to the server was lost. Attempting reset: Failed. ``` Stack trace on QD: ``` #0 execMotionUnsortedReceiver (node=0x96d5060) at nodeMotion.c:317 #1 0x0000000000b0421c in ExecMotion (pstate=0x96d5060) at nodeMotion.c:152 #2 0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d5060) at execProcnode.c:599 #3 0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d5060) at execProcnode.c:560 #4 0x0000000000ad6312 in ExecProcNode (node=0x96d5060) at ../../../src/include/executor/executor.h:268 #5 0x0000000000ad5dd2 in ExecLimit_guts (pstate=0x96d4d50) at nodeLimit.c:98 #6 0x0000000000ad58f5 in ExecLimit (node=0x96d4d50) at nodeLimit.c:246 #7 0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d4d50) at execProcnode.c:599 #8 0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d4d50) at execProcnode.c:560 #9 0x0000000000b07512 in ExecProcNode (node=0x96d4d50) at ../../../src/include/executor/executor.h:268 #10 0x0000000000b06987 in init_tuplestore_state (node=0x96d4b38) at nodeShareInputScan.c:288 #11 0x0000000000b070ca in ExecSquelchShareInputScan (node=0x96d4b38) at nodeShareInputScan.c:633 #12 0x0000000000a7a138 in ExecSquelchNode (node=0x96d4b38) at execAmi.c:804 #13 0x0000000000b0977e in ExecSquelchSequence (node=0x96d49f8) at nodeSequence.c:166 #14 0x0000000000a7a048 in ExecSquelchNode (node=0x96d49f8) at execAmi.c:712 #15 0x0000000000aac890 in mppExecutorFinishup (queryDesc=0xaa4b2d0) at execUtils.c:1667 #16 0x0000000000a93dda in standard_ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1124 #17 0x0000000000a93b87 in ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1062 #18 0x00000000009f1424 in PortalCleanup (portal=0x959e8f8) at portalcmds.c:352 #19 0x0000000000ff637b in AtAbort_Portals () at portalmem.c:862 #20 0x000000000080e9e2 in AbortTransaction () at xact.c:3477 #21 0x0000000000810179 in AbortCurrentTransaction () at xact.c:4157 #22 0x0000000000d9be83 in PostgresMain (argc=1, argv=0x94c5a00, dbname=0x94c5878 "postgres", username=0x94c5858 "pivotal") at postgres.c:4984 #23 0x0000000000cb917c in BackendRun (port=0x94bee90) at postmaster.c:4926 #24 0x0000000000cb84c2 in BackendStartup (port=0x94bee90) at postmaster.c:4611 #25 0x0000000000cb7159 in ServerLoop () at postmaster.c:1963 #26 0x0000000000cb45db in PostmasterMain (argc=7, argv=0x9494150) at postmaster.c:1589 #27 0x0000000000b49113 in main (argc=7, argv=0x9494150) at main.c:240 #28 0x00007f3007f75565 in __libc_start_main (main=0xb48d90 <main>, argc=7, argv=0x7ffc2a889f78, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7ffc2a889f68) at ../csu/libc-start.c:332 #29 0x00000000006e753e in _start () ``` `ExecMotionUnsortedReceiver()` gets called after interconnect is torn down. See `mppExecFinishup()`. Plan: ``` postgres=# explain (costs off) with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a); QUERY PLAN ------------------------------------------------------------------------ Sequence -> Shared Scan (share slice:id 0:0) -> Limit -> Gather Motion 3:1 (slice1; segments: 3) -> Seq Scan on t1 Filter: (random() < '0.1'::double precision) -> Result -> Gather Motion 3:1 (slice2; segments: 3) -> Hash Join Hash Cond: (share0_ref2.a = t2.a) -> Redistribute Motion 1:3 (slice3) Hash Key: share0_ref2.a -> Result -> Shared Scan (share slice:id 3:0) -> Hash -> Seq Scan on t2 Optimizer: Pivotal Optimizer (GPORCA) (17 rows) ``` Notice we there is a Gather Motion below Shared Scan. ### Squelching Share Input Scan See `ExecSquelchShareInputScan()` and commit [9fbd2da](https://github.com/greenplum-db/gpdb/commit/9fbd2da5635eecac6975e7349209373fc9177ef8) ```sql= DROP TABLE IF EXISTS foo; DROP TABLE IF EXISTS bar; DROP TABLE IF EXISTS jazz; CREATE TABLE foo (a int, b int); CREATE TABLE bar (c int, d int); CREATE TABLE jazz(e int, f int); INSERT INTO foo values (1, 2); INSERT INTO bar SELECT i, i from generate_series(1, 100)i; INSERT INTO jazz VALUES (2, 2), (3, 3); ANALYZE foo; ANALYZE bar; ANALYZE jazz; SET optimizer_join_order = 'query'; SELECT * FROM ( WITH cte AS (SELECT * FROM foo) SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte) AS X JOIN bar ON b = c ) AS XY JOIN jazz on c = e AND b = f; WITH cte AS (SELECT * FROM foo) SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte) AS X JOIN bar ON b = c; RESET optimizer_join_order; ``` ``` postgres=# select gp_segment_id, * from bar; gp_segment_id | c | d ---------------+---+--- 0 | 2 | 2 0 | 3 | 3 1 | 1 | 1 (3 rows) ``` Plan: 7X ``` QUERY PLAN ------------------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> Hash Join Hash Cond: (bar.c = jazz.e) -> Sequence -> Shared Scan (share slice:id 1:0) -> Seq Scan on foo -> Hash Join Hash Cond: (share0_ref2.b = bar.c) -> Redistribute Motion 3:3 (slice2; segments: 3) Hash Key: share0_ref2.b -> Append -> Shared Scan (share slice:id 2:0) -> Shared Scan (share slice:id 2:0) -> Hash -> Seq Scan on bar -> Hash -> Seq Scan on jazz Filter: (e = f) Optimizer: Pivotal Optimizer (GPORCA) (19 rows) ``` 6X: ``` QUERY PLAN ------------------------------------------------------------------------ Gather Motion 3:1 (slice2; segments: 3) -> Hash Join Hash Cond: (bar.c = jazz.e) -> Sequence -> Shared Scan (share slice:id 2:0) -> Materialize -> Seq Scan on foo -> Hash Join Hash Cond: (share0_ref2.b = bar.c) -> Redistribute Motion 3:3 (slice1; segments: 3) Hash Key: share0_ref2.b -> Append -> Shared Scan (share slice:id 1:0) -> Shared Scan (share slice:id 1:0) -> Hash -> Seq Scan on bar -> Hash -> Seq Scan on jazz Filter: (e = f) Optimizer: Pivotal Optimizer (GPORCA) (20 rows) ``` Without this commit, this query hangs on 6X, but does not hang on 7X. This query does not hang on 7X because by the time the producer slice gets to `ExecSquelchShareInputScan()`, `local_state->ready` is alreay set in a prevous call to `init_tuplestore_state()` via `ExecShareInputScan()` A case that leads to wrong result if w/o commit [9fbd2da](https://github.com/greenplum-db/gpdb/commit/9fbd2da5635eecac6975e7349209373fc9177ef8) (now added to `shared_scan` test) https://github.com/greenplum-db/gpdb/pull/12550#discussion_r711195354 ### More about the interconnect Greenplum supports three interconnect protocols: TCP, UDP and Proxy(if `enable_ic_proxy` is configured). UDP by default. Can be set by GUC `gp_interconnect_type`. Relavent functions: On QD: ``` InitSliceTable() -- How slices are setup, especially setting up parent-child relations. CdbDispatchPlan() -> cdbdisp_dispatchX() -> AssignGangs() -- Finishes constructing gangs -> AssignWriterGangFirst() -> InventorySliceTree() -> fillSliceVector() - Decides dispatch order -> cdbdisp_dispatchToGang() -- Do dispatch sql command over libpq connections (established during cdb_setup()) ``` On QD and all QE segments: #### SetupInterconnect() Called by `standard_ExecutorStart()` and `ExecSetParamPlan()`. The former is the usual entry point of ExectutorStart (on both QD and QE), the latter is for Planner's InitPlan/SubPlan only. The main purpose of this function is for the QD and each QE process to setup TCP/UDP connection(s) between itself and its sender(child) processes (correspond to the receiving motion on this slice) and its receiver(parent) process (correspond to the sending motion on this slice). #### createChunkTransportState() This function creates a `ChunkTransportStateEntry`, which stores all information of a given motion node. This is a good function to break on to observe how each connection is setup on a QD or QE process. Since each process would only be working on one slice, this founction should be called at most N + 1 (N sender/child slices and 1 receiver/parent slice) times on each process. Question: Can a slice have multiple prarent/receiving slices? Note: `ChunkTransportStateEntry` stores information for a givin Motion, each Motion could have multiple `MotionConn`s. For example, in a 3 segment cluster, the `numConns` of a Broadcast Motion would be 3. The `conns` array is stored in `ChunkTransportStateEntry`. Toy query: ```sql create table foo (a int, b int); create table bar (c int, d int); select * from foo join bar on b = c; ``` break on `createChunkTransportState()` on the QD process and an arbitary QE process and observe `sendSlice` and `recvSlice`. ``` postgres=# explain (costs off) select * from foo join bar on b = c; QUERY PLAN ------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> Hash Join Hash Cond: (foo.b = bar.c) -> Redistribute Motion 3:3 (slice2; segments: 3) Hash Key: foo.b -> Seq Scan on foo -> Hash -> Seq Scan on bar Optimizer: Pivotal Optimizer (GPORCA) (9 rows) ``` #### TeardownInterconnect() The counterpart of `SetupInterconnect()`. Called at `mppExecutorFinishup()`, `ExecSetParamPlan()`, `mappExecutorCleanup()` and other cleanup handling places. ```c /* The TeardownInterconnect() function should be called at the end of executing * a DML statement to close down all socket resources that were setup during * SetupInterconnect(). * * NOTE: it is important that TeardownInterconnect() happens * regardless of the outcome of the statement. i.e. gets called * even if an ERROR occurs during the statement. For abnormal * statement termination we can force an end-of-stream notification. * */ ``` #### removeChunkTransportState() The counterpart of `createChunkTransportState()` #### ICGlobalControlInfo #### estate->es_got_eos = true;