PR: https://github.com/greenplum-db/gpdb/pull/12550
-- setup tables
drop table if exists t1;
create table t1 (a int, b int);
drop table if exists t2;
create table t2 (a int);
-- create a function which returns one more column than expected.
CREATE OR REPLACE FUNCTION ko() RETURNS TABLE (field1 int, field2 int)
LANGUAGE 'plpgsql' VOLATILE STRICT AS
$$
DECLARE
v_qry text;
BEGIN
v_qry := 'with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a)';
return query execute v_qry;
END
$$;
select ko();
Crash:
postgres=# select ko();
ERROR: structure of query does not match function result type
DETAIL: Number of returned columns (3) does not match expected column count (2).
CONTEXT: PL/pgSQL function ko() line 6 at RETURN QUERY
FATAL: Unexpected internal error (assert.c:48)
DETAIL: FailedAssertion("!(node->ps.state->interconnect_context)", File: "nodeMotion.c", Line: 317)
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.
Stack trace on QD:
#0 execMotionUnsortedReceiver (node=0x96d5060) at nodeMotion.c:317
#1 0x0000000000b0421c in ExecMotion (pstate=0x96d5060) at nodeMotion.c:152
#2 0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d5060) at execProcnode.c:599
#3 0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d5060) at execProcnode.c:560
#4 0x0000000000ad6312 in ExecProcNode (node=0x96d5060) at ../../../src/include/executor/executor.h:268
#5 0x0000000000ad5dd2 in ExecLimit_guts (pstate=0x96d4d50) at nodeLimit.c:98
#6 0x0000000000ad58f5 in ExecLimit (node=0x96d4d50) at nodeLimit.c:246
#7 0x0000000000aa0099 in ExecProcNodeGPDB (node=0x96d4d50) at execProcnode.c:599
#8 0x0000000000a9f3d1 in ExecProcNodeFirst (node=0x96d4d50) at execProcnode.c:560
#9 0x0000000000b07512 in ExecProcNode (node=0x96d4d50) at ../../../src/include/executor/executor.h:268
#10 0x0000000000b06987 in init_tuplestore_state (node=0x96d4b38) at nodeShareInputScan.c:288
#11 0x0000000000b070ca in ExecSquelchShareInputScan (node=0x96d4b38) at nodeShareInputScan.c:633
#12 0x0000000000a7a138 in ExecSquelchNode (node=0x96d4b38) at execAmi.c:804
#13 0x0000000000b0977e in ExecSquelchSequence (node=0x96d49f8) at nodeSequence.c:166
#14 0x0000000000a7a048 in ExecSquelchNode (node=0x96d49f8) at execAmi.c:712
#15 0x0000000000aac890 in mppExecutorFinishup (queryDesc=0xaa4b2d0) at execUtils.c:1667
#16 0x0000000000a93dda in standard_ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1124
#17 0x0000000000a93b87 in ExecutorEnd (queryDesc=0xaa4b2d0) at execMain.c:1062
#18 0x00000000009f1424 in PortalCleanup (portal=0x959e8f8) at portalcmds.c:352
#19 0x0000000000ff637b in AtAbort_Portals () at portalmem.c:862
#20 0x000000000080e9e2 in AbortTransaction () at xact.c:3477
#21 0x0000000000810179 in AbortCurrentTransaction () at xact.c:4157
#22 0x0000000000d9be83 in PostgresMain (argc=1, argv=0x94c5a00, dbname=0x94c5878 "postgres", username=0x94c5858 "pivotal") at postgres.c:4984
#23 0x0000000000cb917c in BackendRun (port=0x94bee90) at postmaster.c:4926
#24 0x0000000000cb84c2 in BackendStartup (port=0x94bee90) at postmaster.c:4611
#25 0x0000000000cb7159 in ServerLoop () at postmaster.c:1963
#26 0x0000000000cb45db in PostmasterMain (argc=7, argv=0x9494150) at postmaster.c:1589
#27 0x0000000000b49113 in main (argc=7, argv=0x9494150) at main.c:240
#28 0x00007f3007f75565 in __libc_start_main (main=0xb48d90 <main>, argc=7, argv=0x7ffc2a889f78, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7ffc2a889f68) at ../csu/libc-start.c:332
#29 0x00000000006e753e in _start ()
ExecMotionUnsortedReceiver()
gets called after interconnect is torn down. See mppExecFinishup()
.
Plan:
postgres=# explain (costs off) with cte as (select * from t1 where random() < 0.1 limit 10) select a, 1 , 1 from cte join t2 using (a);
QUERY PLAN
------------------------------------------------------------------------
Sequence
-> Shared Scan (share slice:id 0:0)
-> Limit
-> Gather Motion 3:1 (slice1; segments: 3)
-> Seq Scan on t1
Filter: (random() < '0.1'::double precision)
-> Result
-> Gather Motion 3:1 (slice2; segments: 3)
-> Hash Join
Hash Cond: (share0_ref2.a = t2.a)
-> Redistribute Motion 1:3 (slice3)
Hash Key: share0_ref2.a
-> Result
-> Shared Scan (share slice:id 3:0)
-> Hash
-> Seq Scan on t2
Optimizer: Pivotal Optimizer (GPORCA)
(17 rows)
Notice we there is a Gather Motion below Shared Scan.
See ExecSquelchShareInputScan()
and commit 9fbd2da
DROP TABLE IF EXISTS foo;
DROP TABLE IF EXISTS bar;
DROP TABLE IF EXISTS jazz;
CREATE TABLE foo (a int, b int);
CREATE TABLE bar (c int, d int);
CREATE TABLE jazz(e int, f int);
INSERT INTO foo values (1, 2);
INSERT INTO bar SELECT i, i from generate_series(1, 100)i;
INSERT INTO jazz VALUES (2, 2), (3, 3);
ANALYZE foo;
ANALYZE bar;
ANALYZE jazz;
SET optimizer_join_order = 'query';
SELECT * FROM
(
WITH cte AS (SELECT * FROM foo)
SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte)
AS X
JOIN bar ON b = c
) AS XY
JOIN jazz on c = e AND b = f;
WITH cte AS (SELECT * FROM foo)
SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte)
AS X
JOIN bar ON b = c;
RESET optimizer_join_order;
postgres=# select gp_segment_id, * from bar;
gp_segment_id | c | d
---------------+---+---
0 | 2 | 2
0 | 3 | 3
1 | 1 | 1
(3 rows)
Plan:
7X
QUERY PLAN
------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Hash Join
Hash Cond: (bar.c = jazz.e)
-> Sequence
-> Shared Scan (share slice:id 1:0)
-> Seq Scan on foo
-> Hash Join
Hash Cond: (share0_ref2.b = bar.c)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: share0_ref2.b
-> Append
-> Shared Scan (share slice:id 2:0)
-> Shared Scan (share slice:id 2:0)
-> Hash
-> Seq Scan on bar
-> Hash
-> Seq Scan on jazz
Filter: (e = f)
Optimizer: Pivotal Optimizer (GPORCA)
(19 rows)
6X:
QUERY PLAN
------------------------------------------------------------------------
Gather Motion 3:1 (slice2; segments: 3)
-> Hash Join
Hash Cond: (bar.c = jazz.e)
-> Sequence
-> Shared Scan (share slice:id 2:0)
-> Materialize
-> Seq Scan on foo
-> Hash Join
Hash Cond: (share0_ref2.b = bar.c)
-> Redistribute Motion 3:3 (slice1; segments: 3)
Hash Key: share0_ref2.b
-> Append
-> Shared Scan (share slice:id 1:0)
-> Shared Scan (share slice:id 1:0)
-> Hash
-> Seq Scan on bar
-> Hash
-> Seq Scan on jazz
Filter: (e = f)
Optimizer: Pivotal Optimizer (GPORCA)
(20 rows)
Without this commit, this query hangs on 6X, but does not hang on 7X. This query does not hang on 7X because by the time the producer slice gets to ExecSquelchShareInputScan()
, local_state->ready
is alreay set in a prevous call to init_tuplestore_state()
via ExecShareInputScan()
A case that leads to wrong result if w/o commit 9fbd2da (now added to shared_scan
test)
https://github.com/greenplum-db/gpdb/pull/12550#discussion_r711195354
Greenplum supports three interconnect protocols: TCP, UDP and Proxy(if enable_ic_proxy
is configured). UDP by default. Can be set by GUC gp_interconnect_type
.
Relavent functions:
On QD:
InitSliceTable() -- How slices are setup, especially setting up parent-child relations.
CdbDispatchPlan()
-> cdbdisp_dispatchX()
-> AssignGangs() -- Finishes constructing gangs
-> AssignWriterGangFirst()
-> InventorySliceTree()
-> fillSliceVector() - Decides dispatch order
-> cdbdisp_dispatchToGang() -- Do dispatch sql command over libpq connections (established during cdb_setup())
On QD and all QE segments:
Called by standard_ExecutorStart()
and ExecSetParamPlan()
. The former is the usual entry point of ExectutorStart (on both QD and QE), the latter is for Planner's InitPlan/SubPlan only.
The main purpose of this function is for the QD and each QE process to setup TCP/UDP connection(s) between itself and its sender(child) processes (correspond to the receiving motion on this slice) and its receiver(parent) process (correspond to the sending motion on this slice).
This function creates a ChunkTransportStateEntry
, which stores all information of a given motion node.
This is a good function to break on to observe how each connection is setup on a QD or QE process. Since each process would only be working on one slice, this founction should be called at most N + 1 (N sender/child slices and 1 receiver/parent slice) times on each process.
Question: Can a slice have multiple prarent/receiving slices?
Note: ChunkTransportStateEntry
stores information for a givin Motion, each Motion could have multiple MotionConn
s. For example, in a 3 segment cluster, the numConns
of a Broadcast Motion would be 3. The conns
array is stored in ChunkTransportStateEntry
.
Toy query:
create table foo (a int, b int);
create table bar (c int, d int);
select * from foo join bar on b = c;
break on createChunkTransportState()
on the QD process and an arbitary QE process and observe sendSlice
and recvSlice
.
postgres=# explain (costs off) select * from foo join bar on b = c;
QUERY PLAN
------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Hash Join
Hash Cond: (foo.b = bar.c)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: foo.b
-> Seq Scan on foo
-> Hash
-> Seq Scan on bar
Optimizer: Pivotal Optimizer (GPORCA)
(9 rows)
The counterpart of SetupInterconnect()
. Called at mppExecutorFinishup()
, ExecSetParamPlan()
, mappExecutorCleanup()
and other cleanup handling places.
/* The TeardownInterconnect() function should be called at the end of executing
* a DML statement to close down all socket resources that were setup during
* SetupInterconnect().
*
* NOTE: it is important that TeardownInterconnect() happens
* regardless of the outcome of the statement. i.e. gets called
* even if an ERROR occurs during the statement. For abnormal
* statement termination we can force an end-of-stream notification.
*
*/
The counterpart of createChunkTransportState()