AMA Share Input Scan

A brief intro on Share Input Scan

-- setup
drop table if exists foo;
drop table if exists bar;
drop table if exists jazz;

CREATE TABLE foo (a int, b int);
CREATE TABLE bar (c int, d int);
CREATE TABLE jazz(e int, f int);

INSERT INTO foo values (1, 2);
INSERT INTO bar SELECT i, i from generate_series(1, 100)i;
INSERT INTO jazz VALUES (2, 2), (3, 3);

ANALYZE foo;
ANALYZE bar;
ANALYZE jazz;

drop table if exists t1;
drop table if exists t2;

create table t1 (a int, b int);
create table t2 (a int);

Example 1: a dummy example

Query 1:

SET optimizer = off;

EXPLAIN (COSTS OFF)
WITH cte AS MATERIALIZED (SELECT * FROM foo) SELECT * FROM cte;

RESET optimizer;

Plan 1:

                QUERY PLAN                
------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Shared Scan (share slice:id 1:0)
         ->  Seq Scan on foo
 Optimizer: Postgres query optimizer
(4 rows)

Example 2: producer and consumer on same slice

Query 2a:

EXPLAIN (COSTS OFF)
WITH cte AS (SELECT * FROM foo)
SELECT * FROM cte UNION ALL SELECT * FROM cte;

Plan 2a:

                     QUERY PLAN                     
----------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Sequence
         ->  Shared Scan (share slice:id 1:0)
               ->  Seq Scan on foo
         ->  Append
               ->  Shared Scan (share slice:id 1:0)
               ->  Shared Scan (share slice:id 1:0)
 Optimizer: Pivotal Optimizer (GPORCA)
(8 rows)

Query 2b:

EXPLAIN (COSTS OFF) 
WITH cte as (SELECT * FROM t1 WHERE random() < 0.1 LIMIT 10)
SELECT a, 1 , 1 from cte;

Plan 2b:

                               QUERY PLAN                               
------------------------------------------------------------------------
 Sequence
   ->  Shared Scan (share slice:id 0:0)
         ->  Limit
               ->  Gather Motion 3:1  (slice1; segments: 3)
                     ->  Seq Scan on t1
                           Filter: (random() < '0.1'::double precision)
   ->  Result
         ->  Shared Scan (share slice:id 0:0)
 Optimizer: Pivotal Optimizer (GPORCA)
(9 rows)

Example 3: producer and consumer cross slice

Query 3:

EXPLAIN (COSTS OFF) 
WITH cte as (SELECT * FROM t1 WHERE random() < 0.1 LIMIT 10)
SELECT a, 1 , 1 from cte join t2 USING (a);

Plan 3:

                               QUERY PLAN                               
------------------------------------------------------------------------
 Sequence
   ->  Shared Scan (share slice:id 0:0)
         ->  Limit
               ->  Gather Motion 3:1  (slice1; segments: 3)
                     ->  Seq Scan on t1
                           Filter: (random() < '0.1'::double precision)
   ->  Result
         ->  Gather Motion 3:1  (slice2; segments: 3)
               ->  Hash Join
                     Hash Cond: (share0_ref2.a = t2.a)
                     ->  Redistribute Motion 1:3  (slice3)
                           Hash Key: share0_ref2.a
                           ->  Result
                                 ->  Shared Scan (share slice:id 3:0)
                     ->  Hash
                           ->  Seq Scan on t2
 Optimizer: Pivotal Optimizer (GPORCA)
(17 rows)

Producer/Consumer Synchronization

With GPORCA, CTE producer and consumers are always below a Sequence node. So on the same segment of the same slice, the producer always gets executed first.

Intra-slice

  • Producer fetch tuples and put them into a in-memory tuplestore, and only spill if not enough memory. No Synchronization is required since producer and consumer are on same process.

Cross-slice

  • Producer fetch tuples and materialize them into a shared tuplestore. (See init_tuplestore_state())
  • The shared tuple store are backed by a set of temporary files (See SharedFileSet) that can be shared by multiple backends, if possible, they are also stored across multiple temp tablespaces.
  • Producer and consumers communicate via condition variable on 7X (See shareinput_Xslice_state on 7X) and fifo file (similar to pipe) on 6X (See ShareInput_Lk_Context on 6X)

A cusotmer reported issue

Repro

-- setup tables
drop table if exists t1;
create table t1 (a int, b int);

drop table if exists t2;
create table t2 (a int);

-- create a function which returns one more column than expected.
CREATE OR REPLACE FUNCTION ko() RETURNS TABLE (field1 int, field2 int)
LANGUAGE 'plpgsql' VOLATILE STRICT AS
$$
DECLARE
   v_qry text;
BEGIN
   v_qry := 'with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a)';
  return query execute v_qry;
END
$$;

select ko();

Crash:

postgres=# select ko();
ERROR:  structure of query does not match function result type
DETAIL:  Number of returned columns (3) does not match expected column count (2).
CONTEXT:  PL/pgSQL function ko() line 6 at RETURN QUERY
FATAL:  Unexpected internal error (assert.c:48)
DETAIL:  FailedAssertion("!(node->ps.state->interconnect_context)", File: "nodeMotion.c", Line: 317)
server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.

Stack trace on QD:

#0  execMotionUnsortedReceiver (node=0x96d5060) at nodeMotion.c:317
#1  0x0000000000b0421c in ExecMotion (pstate=0x96d5060) at nodeMotion.c:152
#2  0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d5060) at execProcnode.c:599
#3  0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d5060) at execProcnode.c:560
#4  0x0000000000ad6312 in ExecProcNode (node=0x96d5060) at ../../../src/include/executor/executor.h:268
#5  0x0000000000ad5dd2 in ExecLimit_guts (pstate=0x96d4d50) at nodeLimit.c:98
#6  0x0000000000ad58f5 in ExecLimit (node=0x96d4d50) at nodeLimit.c:246
#7  0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d4d50) at execProcnode.c:599
#8  0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d4d50) at execProcnode.c:560
#9  0x0000000000b07512 in ExecProcNode (node=0x96d4d50) at ../../../src/include/executor/executor.h:268
#10 0x0000000000b06987 in init_tuplestore_state (node=0x96d4b38) at nodeShareInputScan.c:288
#11 0x0000000000b070ca in ExecSquelchShareInputScan (node=0x96d4b38) at nodeShareInputScan.c:633
#12 0x0000000000a7a138 in ExecSquelchNode (node=0x96d4b38) at execAmi.c:804
#13 0x0000000000b0977e in ExecSquelchSequence (node=0x96d49f8) at nodeSequence.c:166
#14 0x0000000000a7a048 in ExecSquelchNode (node=0x96d49f8) at execAmi.c:712
#15 0x0000000000aac890 in mppExecutorFinishup (queryDesc=0xaa4b2d0) at execUtils.c:1667
#16 0x0000000000a93dda in standard_ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1124
#17 0x0000000000a93b87 in ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1062
#18 0x00000000009f1424 in PortalCleanup (portal=0x959e8f8) at portalcmds.c:352
#19 0x0000000000ff637b in AtAbort_Portals () at portalmem.c:862
#20 0x000000000080e9e2 in AbortTransaction () at xact.c:3477
#21 0x0000000000810179 in AbortCurrentTransaction () at xact.c:4157
#22 0x0000000000d9be83 in PostgresMain (argc=1, argv=0x94c5a00, dbname=0x94c5878 "postgres", username=0x94c5858 "pivotal") at postgres.c:4984
#23 0x0000000000cb917c in BackendRun (port=0x94bee90) at postmaster.c:4926
#24 0x0000000000cb84c2 in BackendStartup (port=0x94bee90) at postmaster.c:4611
#25 0x0000000000cb7159 in ServerLoop () at postmaster.c:1963
#26 0x0000000000cb45db in PostmasterMain (argc=7, argv=0x9494150) at postmaster.c:1589
#27 0x0000000000b49113 in main (argc=7, argv=0x9494150) at main.c:240
#28 0x00007f3007f75565 in __libc_start_main (main=0xb48d90 <main>, argc=7, argv=0x7ffc2a889f78, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7ffc2a889f68) at ../csu/libc-start.c:332
#29 0x00000000006e753e in _start ()

RCA

ExecMotionUnsortedReceiver() gets called after interconnect is torn down. See mppExecFinishup().

Plan:

postgres=# explain (costs off) with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a);
                               QUERY PLAN                               
------------------------------------------------------------------------
 Sequence
   ->  Shared Scan (share slice:id 0:0)
         ->  Limit
               ->  Gather Motion 3:1  (slice1; segments: 3)
                     ->  Seq Scan on t1
                           Filter: (random() < '0.1'::double precision)
   ->  Result
         ->  Gather Motion 3:1  (slice2; segments: 3)
               ->  Hash Join
                     Hash Cond: (share0_ref2.a = t2.a)
                     ->  Redistribute Motion 1:3  (slice3)
                           Hash Key: share0_ref2.a
                           ->  Result
                                 ->  Shared Scan (share slice:id 3:0)
                     ->  Hash
                           ->  Seq Scan on t2
 Optimizer: Pivotal Optimizer (GPORCA)
(17 rows)

Notice we there is a Gather Motion below Shared Scan.

Solutions

  1. First call ExecSquelchNode() on QD then tear down interconnect => dead lock
  2. Add special handling in ExecMotionXXX for estate->interconnect_context == NULL => This is our intern's fix. We went with this
  3. Don't call ExecProcNode() during ExecSquelchShareInputScan() => leads to wrong result
  4. Don't call ExecProcNode() during ExecSquelchShareInputScan() if we are in the process of aborting transaction. => seems hacky
  5. Don't call ExecSquelchNode() to cleanup resources in mppExecutorFinishup() => needs more thoughts

Current fix:
7X: https://github.com/greenplum-db/gpdb/pull/12550
6X: https://github.com/greenplum-db/gpdb/pull/12614

What's special about Squelching Share Input Scan?

Share Input Scan is the only executor node that would call ExecProcNode() as part of its ExecSqulechNode() operation (aka ExecSquelchShareInputScan()). Why?

See ExecSquelchShareInputScan() and commit 9fbd2da

Example query

DROP TABLE IF EXISTS foo; DROP TABLE IF EXISTS bar; DROP TABLE IF EXISTS jazz; CREATE TABLE foo (a int, b int); CREATE TABLE bar (c int, d int); CREATE TABLE jazz(e int, f int); INSERT INTO foo values (1, 2); INSERT INTO bar SELECT i, i from generate_series(1, 100)i; INSERT INTO jazz VALUES (2, 2), (3, 3); ANALYZE foo; ANALYZE bar; ANALYZE jazz; SET optimizer_join_order = 'query'; SELECT * FROM ( WITH cte AS (SELECT * FROM foo) SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte) AS X JOIN bar ON b = c ) AS XY JOIN jazz on c = e AND b = f; RESET optimizer_join_order;

Data distribution:

postgres=# select gp_segment_id, * from foo;
 gp_segment_id | a | b 
---------------+---+---
             1 | 1 | 2
(1 row)

-- bar has tuples on every segment.

postgres=# select gp_segment_id, * from jazz;
 gp_segment_id | e | f 
---------------+---+---
             0 | 2 | 2
             0 | 3 | 3
(2 rows)

Plan:

                               QUERY PLAN                               
------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Hash Join
         Hash Cond: (bar.c = jazz.e)
         ->  Sequence
               ->  Shared Scan (share slice:id 1:0)
                     ->  Seq Scan on foo
               ->  Hash Join
                     Hash Cond: (share0_ref2.b = bar.c)
                     ->  Redistribute Motion 3:3  (slice2; segments: 3)
                           Hash Key: share0_ref2.b
                           ->  Append
                                 ->  Shared Scan (share slice:id 2:0)
                                 ->  Shared Scan (share slice:id 2:0)
                     ->  Hash
                           ->  Seq Scan on bar
         ->  Hash
               ->  Seq Scan on jazz
                     Filter: (e = f)
 Optimizer: Pivotal Optimizer (GPORCA)
(19 rows)

Without this commit, this query hangs on 6X, but does not hang on 7X. This query does not hang on 7X because by the time the producer slice gets to ExecSquelchShareInputScan(), local_state->ready is alreay set in a prevous call to init_tuplestore_state() via ExecShareInputScan()

A case that leads to wrong result if w/o commit 9fbd2da (now added to shared_scan test)

https://github.com/greenplum-db/gpdb/pull/12550#discussion_r711195354

Just in case, a recap on slices and interconnect

Greenplum supports three interconnect protocols: TCP, UDP and Proxy(if enable_ic_proxy is configured). UDP by default. Can be set by GUC gp_interconnect_type.

Relavent functions:

On QD:

InitSliceTable() -- How slices are setup, 
                 -- especially setting up parent-child relations.

CdbDispatchPlan()
    -> cdbdisp_dispatchX()
        -> AssignGangs() -- Finishes constructing gangs
            -> AssignWriterGangFirst()
            -> InventorySliceTree()
        -> fillSliceVector() - Decides dispatch order
        -> cdbdisp_dispatchToGang() -- Do dispatch sql command over libpq
                                    -- connections (established during cdb_setup()) 

On QD and all QE segments:

SetupInterconnect()

Called by standard_ExecutorStart() and ExecSetParamPlan(). The former is the usual entry point of ExectutorStart (on both QD and QE), the latter is for Planner's InitPlan/SubPlan only.

The main purpose of this function is for the QD and each QE process to setup TCP/UDP connection(s) between itself and its sender(child) processes (correspond to the receiving motion on this slice) and its receiver(parent) process (correspond to the sending motion on this slice).

createChunkTransportState()

This function creates a ChunkTransportStateEntry, which stores all information of a given motion node.

This is a good function to break on to observe how each connection is setup on a QD or QE process. Since each process would only be working on one slice, this founction should be called at most N + 1 (N sender/child slices and 1 receiver/parent slice) times on each process.

Question: Can a slice have multiple prarent/receiving slices?

Note: ChunkTransportStateEntry stores information for a givin Motion, each Motion could have multiple MotionConns. For example, in a 3 segment cluster, the numConns of a Broadcast Motion would be 3. The conns array is stored in ChunkTransportStateEntry.

Toy query:

create table foo (a int, b int);
create table bar (c int, d int);
select * from foo join bar on b = c;

break on createChunkTransportState() on the QD process and an arbitary QE process and observe sendSlice and recvSlice.

postgres=# explain (costs off) select * from foo join bar on b = c;
                         QUERY PLAN                         
------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Hash Join
         Hash Cond: (foo.b = bar.c)
         ->  Redistribute Motion 3:3  (slice2; segments: 3)
               Hash Key: foo.b
               ->  Seq Scan on foo
         ->  Hash
               ->  Seq Scan on bar
 Optimizer: Pivotal Optimizer (GPORCA)
(9 rows)

TeardownInterconnect()

The counterpart of SetupInterconnect(). Called at mppExecutorFinishup(), ExecSetParamPlan(), mappExecutorCleanup() and other cleanup handling places.

/* The TeardownInterconnect() function should be called at the end of executing
 * a DML statement to close down all socket resources that were setup during
 * SetupInterconnect().
 *
 * NOTE: it is important that TeardownInterconnect() happens
 *		 regardless of the outcome of the statement. i.e. gets called
 *		 even if an ERROR occurs during the statement. For abnormal
 *		 statement termination we can force an end-of-stream notification.
 *
 */

removeChunkTransportState()

The counterpart of createChunkTransportState()

Thanks

@Zhenghua Lyu
@Shreedhar Hardikar
@Adam Lee
@Ekta Khanna

Find us on #gp-qpa