---
tags: Greenplum, Tech Discussion
---
# Squelching Share Input Scan with Motion
PR: https://github.com/greenplum-db/gpdb/pull/12550
### Repro
```sql
-- setup tables
drop table if exists t1;
create table t1 (a int, b int);
drop table if exists t2;
create table t2 (a int);
-- create a function which returns one more column than expected.
CREATE OR REPLACE FUNCTION ko() RETURNS TABLE (field1 int, field2 int)
LANGUAGE 'plpgsql' VOLATILE STRICT AS
$$
DECLARE
v_qry text;
BEGIN
v_qry := 'with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a)';
return query execute v_qry;
END
$$;
select ko();
```
Crash:
```
postgres=# select ko();
ERROR: structure of query does not match function result type
DETAIL: Number of returned columns (3) does not match expected column count (2).
CONTEXT: PL/pgSQL function ko() line 6 at RETURN QUERY
FATAL: Unexpected internal error (assert.c:48)
DETAIL: FailedAssertion("!(node->ps.state->interconnect_context)", File: "nodeMotion.c", Line: 317)
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
```
Stack trace on QD:
```
#0 execMotionUnsortedReceiver (node=0x96d5060) at nodeMotion.c:317
#1 0x0000000000b0421c in ExecMotion (pstate=0x96d5060) at nodeMotion.c:152
#2 0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d5060) at execProcnode.c:599
#3 0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d5060) at execProcnode.c:560
#4 0x0000000000ad6312 in ExecProcNode (node=0x96d5060) at ../../../src/include/executor/executor.h:268
#5 0x0000000000ad5dd2 in ExecLimit_guts (pstate=0x96d4d50) at nodeLimit.c:98
#6 0x0000000000ad58f5 in ExecLimit (node=0x96d4d50) at nodeLimit.c:246
#7 0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d4d50) at execProcnode.c:599
#8 0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d4d50) at execProcnode.c:560
#9 0x0000000000b07512 in ExecProcNode (node=0x96d4d50) at ../../../src/include/executor/executor.h:268
#10 0x0000000000b06987 in init_tuplestore_state (node=0x96d4b38) at nodeShareInputScan.c:288
#11 0x0000000000b070ca in ExecSquelchShareInputScan (node=0x96d4b38) at nodeShareInputScan.c:633
#12 0x0000000000a7a138 in ExecSquelchNode (node=0x96d4b38) at execAmi.c:804
#13 0x0000000000b0977e in ExecSquelchSequence (node=0x96d49f8) at nodeSequence.c:166
#14 0x0000000000a7a048 in ExecSquelchNode (node=0x96d49f8) at execAmi.c:712
#15 0x0000000000aac890 in mppExecutorFinishup (queryDesc=0xaa4b2d0) at execUtils.c:1667
#16 0x0000000000a93dda in standard_ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1124
#17 0x0000000000a93b87 in ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1062
#18 0x00000000009f1424 in PortalCleanup (portal=0x959e8f8) at portalcmds.c:352
#19 0x0000000000ff637b in AtAbort_Portals () at portalmem.c:862
#20 0x000000000080e9e2 in AbortTransaction () at xact.c:3477
#21 0x0000000000810179 in AbortCurrentTransaction () at xact.c:4157
#22 0x0000000000d9be83 in PostgresMain (argc=1, argv=0x94c5a00, dbname=0x94c5878 "postgres", username=0x94c5858 "pivotal") at postgres.c:4984
#23 0x0000000000cb917c in BackendRun (port=0x94bee90) at postmaster.c:4926
#24 0x0000000000cb84c2 in BackendStartup (port=0x94bee90) at postmaster.c:4611
#25 0x0000000000cb7159 in ServerLoop () at postmaster.c:1963
#26 0x0000000000cb45db in PostmasterMain (argc=7, argv=0x9494150) at postmaster.c:1589
#27 0x0000000000b49113 in main (argc=7, argv=0x9494150) at main.c:240
#28 0x00007f3007f75565 in __libc_start_main (main=0xb48d90 <main>, argc=7, argv=0x7ffc2a889f78, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7ffc2a889f68) at ../csu/libc-start.c:332
#29 0x00000000006e753e in _start ()
```
`ExecMotionUnsortedReceiver()` gets called after interconnect is torn down. See `mppExecFinishup()`.
Plan:
```
postgres=# explain (costs off) with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a);
QUERY PLAN
------------------------------------------------------------------------
Sequence
-> Shared Scan (share slice:id 0:0)
-> Limit
-> Gather Motion 3:1 (slice1; segments: 3)
-> Seq Scan on t1
Filter: (random() < '0.1'::double precision)
-> Result
-> Gather Motion 3:1 (slice2; segments: 3)
-> Hash Join
Hash Cond: (share0_ref2.a = t2.a)
-> Redistribute Motion 1:3 (slice3)
Hash Key: share0_ref2.a
-> Result
-> Shared Scan (share slice:id 3:0)
-> Hash
-> Seq Scan on t2
Optimizer: Pivotal Optimizer (GPORCA)
(17 rows)
```
Notice we there is a Gather Motion below Shared Scan.
### Squelching Share Input Scan
See `ExecSquelchShareInputScan()` and commit [9fbd2da](https://github.com/greenplum-db/gpdb/commit/9fbd2da5635eecac6975e7349209373fc9177ef8)
```sql=
DROP TABLE IF EXISTS foo;
DROP TABLE IF EXISTS bar;
DROP TABLE IF EXISTS jazz;
CREATE TABLE foo (a int, b int);
CREATE TABLE bar (c int, d int);
CREATE TABLE jazz(e int, f int);
INSERT INTO foo values (1, 2);
INSERT INTO bar SELECT i, i from generate_series(1, 100)i;
INSERT INTO jazz VALUES (2, 2), (3, 3);
ANALYZE foo;
ANALYZE bar;
ANALYZE jazz;
SET optimizer_join_order = 'query';
SELECT * FROM
(
WITH cte AS (SELECT * FROM foo)
SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte)
AS X
JOIN bar ON b = c
) AS XY
JOIN jazz on c = e AND b = f;
WITH cte AS (SELECT * FROM foo)
SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte)
AS X
JOIN bar ON b = c;
RESET optimizer_join_order;
```
```
postgres=# select gp_segment_id, * from bar;
gp_segment_id | c | d
---------------+---+---
0 | 2 | 2
0 | 3 | 3
1 | 1 | 1
(3 rows)
```
Plan:
7X
```
QUERY PLAN
------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Hash Join
Hash Cond: (bar.c = jazz.e)
-> Sequence
-> Shared Scan (share slice:id 1:0)
-> Seq Scan on foo
-> Hash Join
Hash Cond: (share0_ref2.b = bar.c)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: share0_ref2.b
-> Append
-> Shared Scan (share slice:id 2:0)
-> Shared Scan (share slice:id 2:0)
-> Hash
-> Seq Scan on bar
-> Hash
-> Seq Scan on jazz
Filter: (e = f)
Optimizer: Pivotal Optimizer (GPORCA)
(19 rows)
```
6X:
```
QUERY PLAN
------------------------------------------------------------------------
Gather Motion 3:1 (slice2; segments: 3)
-> Hash Join
Hash Cond: (bar.c = jazz.e)
-> Sequence
-> Shared Scan (share slice:id 2:0)
-> Materialize
-> Seq Scan on foo
-> Hash Join
Hash Cond: (share0_ref2.b = bar.c)
-> Redistribute Motion 3:3 (slice1; segments: 3)
Hash Key: share0_ref2.b
-> Append
-> Shared Scan (share slice:id 1:0)
-> Shared Scan (share slice:id 1:0)
-> Hash
-> Seq Scan on bar
-> Hash
-> Seq Scan on jazz
Filter: (e = f)
Optimizer: Pivotal Optimizer (GPORCA)
(20 rows)
```
Without this commit, this query hangs on 6X, but does not hang on 7X. This query does not hang on 7X because by the time the producer slice gets to `ExecSquelchShareInputScan()`, `local_state->ready` is alreay set in a prevous call to `init_tuplestore_state()` via `ExecShareInputScan()`
A case that leads to wrong result if w/o commit [9fbd2da](https://github.com/greenplum-db/gpdb/commit/9fbd2da5635eecac6975e7349209373fc9177ef8) (now added to `shared_scan` test)
https://github.com/greenplum-db/gpdb/pull/12550#discussion_r711195354
### More about the interconnect
Greenplum supports three interconnect protocols: TCP, UDP and Proxy(if `enable_ic_proxy` is configured). UDP by default. Can be set by GUC `gp_interconnect_type`.
Relavent functions:
On QD:
```
InitSliceTable() -- How slices are setup, especially setting up parent-child relations.
CdbDispatchPlan()
-> cdbdisp_dispatchX()
-> AssignGangs() -- Finishes constructing gangs
-> AssignWriterGangFirst()
-> InventorySliceTree()
-> fillSliceVector() - Decides dispatch order
-> cdbdisp_dispatchToGang() -- Do dispatch sql command over libpq connections (established during cdb_setup())
```
On QD and all QE segments:
#### SetupInterconnect()
Called by `standard_ExecutorStart()` and `ExecSetParamPlan()`. The former is the usual entry point of ExectutorStart (on both QD and QE), the latter is for Planner's InitPlan/SubPlan only.
The main purpose of this function is for the QD and each QE process to setup TCP/UDP connection(s) between itself and its sender(child) processes (correspond to the receiving motion on this slice) and its receiver(parent) process (correspond to the sending motion on this slice).
#### createChunkTransportState()
This function creates a `ChunkTransportStateEntry`, which stores all information of a given motion node.
This is a good function to break on to observe how each connection is setup on a QD or QE process. Since each process would only be working on one slice, this founction should be called at most N + 1 (N sender/child slices and 1 receiver/parent slice) times on each process.
Question: Can a slice have multiple prarent/receiving slices?
Note: `ChunkTransportStateEntry` stores information for a givin Motion, each Motion could have multiple `MotionConn`s. For example, in a 3 segment cluster, the `numConns` of a Broadcast Motion would be 3. The `conns` array is stored in `ChunkTransportStateEntry`.
Toy query:
```sql
create table foo (a int, b int);
create table bar (c int, d int);
select * from foo join bar on b = c;
```
break on `createChunkTransportState()` on the QD process and an arbitary QE process and observe `sendSlice` and `recvSlice`.
```
postgres=# explain (costs off) select * from foo join bar on b = c;
QUERY PLAN
------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Hash Join
Hash Cond: (foo.b = bar.c)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: foo.b
-> Seq Scan on foo
-> Hash
-> Seq Scan on bar
Optimizer: Pivotal Optimizer (GPORCA)
(9 rows)
```
#### TeardownInterconnect()
The counterpart of `SetupInterconnect()`. Called at `mppExecutorFinishup()`, `ExecSetParamPlan()`, `mappExecutorCleanup()` and other cleanup handling places.
```c
/* The TeardownInterconnect() function should be called at the end of executing
* a DML statement to close down all socket resources that were setup during
* SetupInterconnect().
*
* NOTE: it is important that TeardownInterconnect() happens
* regardless of the outcome of the statement. i.e. gets called
* even if an ERROR occurs during the statement. For abnormal
* statement termination we can force an end-of-stream notification.
*
*/
```
#### removeChunkTransportState()
The counterpart of `createChunkTransportState()`
#### ICGlobalControlInfo
#### estate->es_got_eos = true;