From 433247e00e13a33bf3dba5e8e269bb005884da37 Mon Sep 17 00:00:00 2001 From: Alena Rybakina Date: Mon, 1 Jun 2026 05:18:15 +0300 Subject: [PATCH] Generate a native ORCA plan for a replicated CTE in scalar subqueries When a CTE over a DISTRIBUTED REPLICATED table is referenced from several scalar subqueries, ORCA puts the SharedScan Producer and Consumer on different slices. That cross-slice SharedScan used to hang. Until now we just avoided the hang: FHasCrossSliceReplicatedCTEConsumer detected this shape before DXL translation and fell back to the Postgres planner. This change lets ORCA handle the scalar-subquery case natively, so no fallback is needed: the replicated CTE is materialized once and shared by all references inside ORCA's own plan. The fix is in apply_shareinput_xslice (src/backend/cdb/cdbmutate.c). When a cross-slice Consumer is found inside a SubPlan and the CTE source is a replicated table, the Consumer gets its own local copy of the Producer's subtree (Materialize + base Scan) with a fresh share_id, marked SHARE_MATERIAL and placed in the Consumer's slice. The table is replicated, so every segment already has the full data and the local copy is equivalent -- this removes the cross-slice coordination. Other Consumers of the same CTE in the same slice reuse this copy (tracked by (orig_share_id, motId) -> new_share_id), so the CTE is materialized once and read by all references. cleanup_orphaned_producers then drops the original Producers that are no longer used. The reuse map and consumer counts live in new ApplyShareInputContext fields in src/include/nodes/relation.h. The pre-DXL fallback check (CUtils::FHasCrossSliceReplicatedCTEConsumer) was too broad -- it fired for every cross-slice replicated CTE Consumer. Narrow it to the join case only: a CTE Consumer under a duplicate-hazard / broadcast Motion (greengage 51fe92e), which still can't be handled natively. The scalar-subquery case no longer matches here and reaches the new materialization path instead. --- src/backend/cdb/cdbmutate.c | 513 ++++++++++++++++-- .../gporca/libgpopt/src/base/CUtils.cpp | 154 +++--- src/include/nodes/relation.h | 24 + src/test/regress/expected/shared_scan.out | 44 +- .../expected/shared_scan_optimizer.out | 46 +- src/test/regress/sql/shared_scan.sql | 20 +- 6 files changed, 676 insertions(+), 125 deletions(-) diff --git a/src/backend/cdb/cdbmutate.c b/src/backend/cdb/cdbmutate.c index 9fb644791c6..d3c3d2b7137 100644 --- a/src/backend/cdb/cdbmutate.c +++ b/src/backend/cdb/cdbmutate.c @@ -2437,6 +2437,12 @@ shareinput_mutator_xslice_1(Node *node, PlannerInfo *root, bool fPop) ctxt->producers[sisc->share_id] = sisc; ctxt->sliceMarks[sisc->share_id] = motId; } + else + { + /* Consumer: count references to original producers */ + if (sisc->share_id < ctxt->orig_producer_count) + ctxt->consumer_counts[sisc->share_id]++; + } } return true; @@ -2487,13 +2493,170 @@ shareinput_mutator_xslice_2(Node *node, PlannerInfo *root, bool fPop) if (shareSliceId != motId) { - ShareType stype = get_plan_share_type(plan_slicemark.plan); + /* + * Check for cross-slice SharedScan with a replicated + * table inside a SubPlan. When the producer runs on + * fewer segments than the consumer, the temp file does + * not exist on all consumer segments. + * + * Fix: give this consumer its own copy of the + * producer's underlying plan (Materialize + Scan) and + * convert it to an intra-slice SHARE_MATERIAL with a + * new share_id. The consumer then materializes data + * locally instead of reading cross-slice temp files. + */ + bool inlined = false; + + if (ctxt->walking_subplan) + { + int origShareId = sisc->share_id; + int existingNewId = -1; + int k; + + /* + * Check if we already inlined a consumer for this + * same original share_id in this same slice. If so, + * make this consumer a reader of the existing inlined + * producer instead of creating another copy. + */ + for (k = 0; k < ctxt->inlined_count; k++) + { + if (ctxt->inlined_orig_ids[k] == origShareId && + ctxt->inlined_mot_ids[k] == motId) + { + existingNewId = ctxt->inlined_new_ids[k]; + break; + } + } + + if (existingNewId >= 0) + { + /* + * Reuse the already-inlined producer: make this + * consumer a reader with the same share_id. + * This is intra-slice sharing, so no need to + * increment nsharer_xslice. + */ + if (origShareId < ctxt->orig_producer_count) + ctxt->consumer_counts[origShareId]--; + + sisc->share_id = existingNewId; + sisc->share_type = SHARE_MATERIAL; + sisc->driver_slice = motId; + + inlined = true; + } + else + { + ShareInputScan *producer = ctxt->producers[origShareId]; + Plan *producerChild = producer->scan.plan.lefttree; + Plan *leaf = producerChild; + + while (leaf && leaf->lefttree) + leaf = leaf->lefttree; + + /* + * Any scan over a base relation shares the Scan + * struct layout (scanrelid at a fixed offset): + * SeqScan, IndexScan, IndexOnlyScan, BitmapHeapScan, + * TidScan, SampleScan, and the Dynamic* variants. + */ + if (leaf && + (IsA(leaf, SeqScan) || + IsA(leaf, IndexScan) || + IsA(leaf, IndexOnlyScan) || + IsA(leaf, BitmapHeapScan) || + IsA(leaf, TidScan) || + IsA(leaf, DynamicSeqScan) || + IsA(leaf, DynamicIndexScan) || + IsA(leaf, DynamicBitmapHeapScan))) + { + Index scanrelid = ((Scan *) leaf)->scanrelid; + List *rtable = glob->finalrtable; - if (stype == SHARE_MATERIAL || stype == SHARE_SORT) - set_plan_share_type_xslice(plan_slicemark.plan); + if (scanrelid > 0 && + scanrelid <= (Index) list_length(rtable)) + { + RangeTblEntry *rte = rt_fetch(scanrelid, rtable); + + if (rte->rtekind == RTE_RELATION) + { + GpPolicy *policy = GpPolicyFetch(rte->relid); + + if (policy && + policy->ptype == POLICYTYPE_REPLICATED && + producerChild) + { + Plan *newChild; + int newShareId; + + pfree(policy); + + /* + * Deep copy the producer's subtree and + * assign a fresh share_id so there is no + * conflict with the original producer. + */ + if (origShareId < ctxt->orig_producer_count) + ctxt->consumer_counts[origShareId]--; + + newChild = (Plan *) copyObject(producerChild); + newShareId = ctxt->producer_count; + ctxt->producer_count++; + + set_plan_share_id(newChild, newShareId); + sisc->share_id = newShareId; + sisc->share_type = SHARE_MATERIAL; + sisc->driver_slice = motId; + plan->lefttree = newChild; + + /* + * Register the new producer so later + * passes can find it. + */ + ctxt->producers = repalloc(ctxt->producers, + ctxt->producer_count * sizeof(ShareInputScan *)); + ctxt->producers[newShareId] = sisc; + ctxt->sliceMarks = repalloc(ctxt->sliceMarks, + ctxt->producer_count * sizeof(int)); + ctxt->sliceMarks[newShareId] = motId; + + /* + * Record the mapping so subsequent consumers + * of the same CTE in this slice can reuse + * this inlined producer. + */ + ctxt->inlined_count++; + ctxt->inlined_orig_ids = repalloc(ctxt->inlined_orig_ids, + ctxt->inlined_count * sizeof(int)); + ctxt->inlined_mot_ids = repalloc(ctxt->inlined_mot_ids, + ctxt->inlined_count * sizeof(int)); + ctxt->inlined_new_ids = repalloc(ctxt->inlined_new_ids, + ctxt->inlined_count * sizeof(int)); + ctxt->inlined_orig_ids[ctxt->inlined_count - 1] = origShareId; + ctxt->inlined_mot_ids[ctxt->inlined_count - 1] = motId; + ctxt->inlined_new_ids[ctxt->inlined_count - 1] = newShareId; + + inlined = true; + } + else if (policy) + pfree(policy); + } + } + } + } + } + + if (!inlined) + { + ShareType stype = get_plan_share_type(plan_slicemark.plan); - incr_plan_nsharer_xslice(plan_slicemark.plan); - sisc->driver_slice = motId; + if (stype == SHARE_MATERIAL || stype == SHARE_SORT) + set_plan_share_type_xslice(plan_slicemark.plan); + + incr_plan_nsharer_xslice(plan_slicemark.plan); + sisc->driver_slice = motId; + } } } } @@ -2611,22 +2774,320 @@ shareinput_mutator_xslice_4(Node *node, PlannerInfo *root, bool fPop) return true; } +/* + * record_subplan_motid_walker + * Walk expressions looking for SubPlan references. For each SubPlan found, + * record the current motId from the motion stack. This tells us what slice + * the SubPlan actually executes in (which is the caller's slice). + */ +static bool +record_subplan_motid_walker(Node *node, ApplyShareInputContext *ctxt) +{ + if (node == NULL) + return false; + + if (IsA(node, SubPlan)) + { + SubPlan *sp = (SubPlan *) node; + int motId = shareinput_peekmot(ctxt); + + if (sp->plan_id >= 1 && sp->plan_id <= ctxt->num_subplans) + ctxt->subplan_motids[sp->plan_id - 1] = motId; + + /* don't recurse into SubPlan's testexpr/args */ + return false; + } + + return expression_tree_walker(node, record_subplan_motid_walker, ctxt); +} + +/* + * shareinput_mutator_build_subplan_motids + * Pre-pass over the main plan tree to build a mapping from SubPlan plan_id + * to the motId (slice) where the SubPlan is referenced. + * + * SubPlan plan trees are walked separately by apply_shareinput_xslice, but + * without motion context from the main plan. This causes the xslice passes + * to incorrectly treat SharedScan consumers in SubPlans as being in slice 0 + * instead of their actual execution slice. By recording the correct motId + * here, we can push it before walking each subplan. + */ + static bool + shareinput_mutator_build_subplan_motids(Node *node, PlannerInfo *root, bool fPop) + { + PlannerGlobal *glob = root->glob; + ApplyShareInputContext *ctxt = &glob->share; + Plan *plan = (Plan *) node; + + if (fPop) + { + if (IsA(plan, Motion)) + shareinput_popmot(ctxt); + return false; + } + + if (IsA(plan, Motion)) + { + Motion *motion = (Motion *) plan; + + shareinput_pushmot(ctxt, motion->motionID); + return true; + } + + /* Scan common expression fields for SubPlan references */ + record_subplan_motid_walker((Node *) plan->targetlist, ctxt); + record_subplan_motid_walker((Node *) plan->qual, ctxt); + + /* + * Check additional node-type-specific expression fields where SubPlan + * references may appear. Modeled after finalize_plan() in subselect.c. + */ + switch (nodeTag(plan)) + { + case T_Result: + record_subplan_motid_walker(((Result *) plan)->resconstantqual, ctxt); + break; + case T_IndexScan: + record_subplan_motid_walker((Node *) ((IndexScan *) plan)->indexqual, ctxt); + record_subplan_motid_walker((Node *) ((IndexScan *) plan)->indexorderby, ctxt); + break; + case T_IndexOnlyScan: + record_subplan_motid_walker((Node *) ((IndexOnlyScan *) plan)->indexqual, ctxt); + record_subplan_motid_walker((Node *) ((IndexOnlyScan *) plan)->indexorderby, ctxt); + break; + case T_BitmapIndexScan: + record_subplan_motid_walker((Node *) ((BitmapIndexScan *) plan)->indexqual, ctxt); + break; + case T_BitmapHeapScan: + record_subplan_motid_walker((Node *) ((BitmapHeapScan *) plan)->bitmapqualorig, ctxt); + break; + case T_TidScan: + record_subplan_motid_walker((Node *) ((TidScan *) plan)->tidquals, ctxt); + break; + case T_ForeignScan: + record_subplan_motid_walker((Node *) ((ForeignScan *) plan)->fdw_exprs, ctxt); + break; + case T_NestLoop: + record_subplan_motid_walker((Node *) ((Join *) plan)->joinqual, ctxt); + break; + case T_MergeJoin: + record_subplan_motid_walker((Node *) ((Join *) plan)->joinqual, ctxt); + record_subplan_motid_walker((Node *) ((MergeJoin *) plan)->mergeclauses, ctxt); + break; + case T_HashJoin: + record_subplan_motid_walker((Node *) ((Join *) plan)->joinqual, ctxt); + record_subplan_motid_walker((Node *) ((HashJoin *) plan)->hashclauses, ctxt); + record_subplan_motid_walker((Node *) ((HashJoin *) plan)->hashqualclauses, ctxt); + break; + case T_Limit: + record_subplan_motid_walker(((Limit *) plan)->limitOffset, ctxt); + record_subplan_motid_walker(((Limit *) plan)->limitCount, ctxt); + break; + case T_Motion: + record_subplan_motid_walker((Node *) ((Motion *) plan)->hashExprs, ctxt); + break; + case T_ModifyTable: + record_subplan_motid_walker((Node *) ((ModifyTable *) plan)->returningLists, ctxt); + break; + case T_ValuesScan: + record_subplan_motid_walker((Node *) ((ValuesScan *) plan)->values_lists, ctxt); + break; + case T_WindowAgg: + record_subplan_motid_walker(((WindowAgg *) plan)->startOffset, ctxt); + record_subplan_motid_walker(((WindowAgg *) plan)->endOffset, ctxt); + break; + case T_PartitionSelector: + record_subplan_motid_walker((Node *) ((PartitionSelector *) plan)->levelEqExpressions, ctxt); + record_subplan_motid_walker((Node *) ((PartitionSelector *) plan)->levelExpressions, ctxt); + record_subplan_motid_walker(((PartitionSelector *) plan)->residualPredicate, ctxt); + record_subplan_motid_walker(((PartitionSelector *) plan)->propagationExpression, ctxt); + record_subplan_motid_walker(((PartitionSelector *) plan)->printablePredicate, ctxt); + record_subplan_motid_walker((Node *) ((PartitionSelector *) plan)->partTabTargetlist, ctxt); + break; + default: + break; + } + + return true; + } + +/* + * shareinput_walk_subplans + * Walk all subplans using the given mutator function, pushing the correct + * motId for each subplan based on where it is referenced in the main plan. + */ +static void +shareinput_walk_subplans(SHAREINPUT_MUTATOR f, PlannerGlobal *glob) +{ + ApplyShareInputContext *ctxt = &glob->share; + ListCell *lp, *lr; + int i = 0; + + forboth(lp, glob->subplans, lr, glob->subroots) + { + Plan *subplan = (Plan *) lfirst(lp); + PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); + + if (ctxt->subplan_motids != NULL && i < ctxt->num_subplans) + shareinput_pushmot(ctxt, ctxt->subplan_motids[i]); + + ctxt->walking_subplan = true; + shareinput_walker(f, (Node *) subplan, subroot); + ctxt->walking_subplan = false; + + if (ctxt->subplan_motids != NULL && i < ctxt->num_subplans) + shareinput_popmot(ctxt); + + i++; + } +} + +/* + * cleanup_orphaned_producers + * After inlining, some original producers may have zero remaining + * consumers. Remove them from Sequence nodes to eliminate unnecessary + * SeqScans. Collapse Sequence nodes that end up with a single child. + */ +static Plan * +cleanup_orphaned_producers(Plan *plan, ApplyShareInputContext *ctxt) +{ + if (plan == NULL) + return NULL; + + if (IsA(plan, Sequence)) + { + Sequence *seq = (Sequence *) plan; + List *newplans = NIL; + ListCell *lc; + + foreach(lc, seq->subplans) + { + Plan *child = (Plan *) lfirst(lc); + + child = cleanup_orphaned_producers(child, ctxt); + + /* Skip orphaned SharedScan producers (those with lefttree) */ + if (IsA(child, ShareInputScan) && child->lefttree != NULL) + { + ShareInputScan *sisc = (ShareInputScan *) child; + + if (sisc->share_id < ctxt->orig_producer_count && + ctxt->consumer_counts[sisc->share_id] == 0) + continue; + } + + /* + * Flatten nested Sequences: if a child is itself a + * Sequence, splice its children into this level. + */ + if (IsA(child, Sequence)) + { + Sequence *inner = (Sequence *) child; + ListCell *lc2; + + foreach(lc2, inner->subplans) + newplans = lappend(newplans, lfirst(lc2)); + } + else + newplans = lappend(newplans, child); + } + + seq->subplans = newplans; + return plan; + } + + if (IsA(plan, Append)) + { + ListCell *lc; + + foreach(lc, ((Append *) plan)->appendplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, MergeAppend)) + { + ListCell *lc; + + foreach(lc, ((MergeAppend *) plan)->mergeplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, ModifyTable)) + { + ListCell *lc; + + foreach(lc, ((ModifyTable *) plan)->plans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, BitmapAnd)) + { + ListCell *lc; + + foreach(lc, ((BitmapAnd *) plan)->bitmapplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, BitmapOr)) + { + ListCell *lc; + + foreach(lc, ((BitmapOr *) plan)->bitmapplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, SubqueryScan)) + { + SubqueryScan *sub = (SubqueryScan *) plan; + + sub->subplan = cleanup_orphaned_producers(sub->subplan, ctxt); + } + else + { + plan->lefttree = cleanup_orphaned_producers(plan->lefttree, ctxt); + plan->righttree = cleanup_orphaned_producers(plan->righttree, ctxt); + } + + return plan; +} + Plan * apply_shareinput_xslice(Plan *plan, PlannerInfo *root) { PlannerGlobal *glob = root->glob; ApplyShareInputContext *ctxt = &glob->share; - ListCell *lp, *lr; ctxt->motStack = NULL; ctxt->qdShares = NULL; ctxt->qdSlices = NULL; ctxt->nextPlanId = 0; + ctxt->walking_subplan = false; + + ctxt->inlined_orig_ids = palloc0(sizeof(int)); + ctxt->inlined_mot_ids = palloc0(sizeof(int)); + ctxt->inlined_new_ids = palloc0(sizeof(int)); + ctxt->inlined_count = 0; + + ctxt->orig_producer_count = ctxt->producer_count; + ctxt->consumer_counts = palloc0(ctxt->producer_count * sizeof(int)); ctxt->sliceMarks = palloc0(ctxt->producer_count * sizeof(int)); shareinput_pushmot(ctxt, 0); + /* + * Pre-pass: build a mapping from SubPlan plan_id to the motId (slice) + * where the SubPlan is referenced in the main plan. This is needed + * because the xslice passes walk subplan trees separately, and without + * this mapping they would use motId=0 for all subplan nodes. + */ + ctxt->num_subplans = list_length(glob->subplans); + if (ctxt->num_subplans > 0) + { + ctxt->subplan_motids = palloc0(ctxt->num_subplans * sizeof(int)); + shareinput_walker(shareinput_mutator_build_subplan_motids, + (Node *) plan, root); + } + else + { + ctxt->subplan_motids = NULL; + } + /* * Walk the tree. See comment for each pass for what each pass will do. * The context is used to carry information from one pass to another, as @@ -2639,43 +3100,27 @@ apply_shareinput_xslice(Plan *plan, PlannerInfo *root) * walk through all plans and collect all producer subplans into the * context, before processing the consumers. */ - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); - - shareinput_walker(shareinput_mutator_xslice_1, (Node *) subplan, subroot); - } + shareinput_walk_subplans(shareinput_mutator_xslice_1, glob); shareinput_walker(shareinput_mutator_xslice_1, (Node *) plan, root); /* Now walk the tree again, and process all the consumers. */ - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); - - shareinput_walker(shareinput_mutator_xslice_2, (Node *) subplan, subroot); - } + shareinput_walk_subplans(shareinput_mutator_xslice_2, glob); shareinput_walker(shareinput_mutator_xslice_2, (Node *) plan, root); - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); - - shareinput_walker(shareinput_mutator_xslice_3, (Node *) subplan, subroot); - } + shareinput_walk_subplans(shareinput_mutator_xslice_3, glob); shareinput_walker(shareinput_mutator_xslice_3, (Node *) plan, root); - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); - - shareinput_walker(shareinput_mutator_xslice_4, (Node *) subplan, subroot); - } + shareinput_walk_subplans(shareinput_mutator_xslice_4, glob); shareinput_walker(shareinput_mutator_xslice_4, (Node *) plan, root); + /* + * Cleanup: remove orphaned SharedScan producers from Sequence nodes. + * After inlining, some original producers may have lost all consumers. + * Keeping them would cause unnecessary SeqScans at execution time. + */ + if (ctxt->inlined_count > 0) + plan = cleanup_orphaned_producers(plan, ctxt); + return plan; } diff --git a/src/backend/gporca/libgpopt/src/base/CUtils.cpp b/src/backend/gporca/libgpopt/src/base/CUtils.cpp index d114a639449..8f5dc89ce7e 100644 --- a/src/backend/gporca/libgpopt/src/base/CUtils.cpp +++ b/src/backend/gporca/libgpopt/src/base/CUtils.cpp @@ -10,6 +10,8 @@ //--------------------------------------------------------------------------- #include "gpopt/base/CUtils.h" +#include "gpos/common/CBitSet.h" +#include "gpos/common/CBitSetIter.h" #include "gpos/common/clibwrapper.h" #include "gpos/common/syslibwrapper.h" #include "gpos/io/CFileDescriptor.h" @@ -901,89 +903,86 @@ CUtils::FHasCTEAnchor(CExpression *pexpr) return false; } -// True if the distribution is replicated-like. -static BOOL -FReplicatedLikeDistribution(CDistributionSpec::EDistributionType edt) -{ - return (CDistributionSpec::EdtStrictReplicated == edt || - CDistributionSpec::EdtTaintedReplicated == edt || - CDistributionSpec::EdtUniversal == edt); -} - -struct SCTEInfo -{ - ULONG cteId; - ULONG sliceId; - - SCTEInfo(ULONG cte_id, ULONG slice_id) : cteId(cte_id), sliceId(slice_id) - { - } -}; - -typedef CDynamicPtrArray > CTEInfoArray; - -// Walk the physical tree, recording the slice id of every replicated -// CTE Producer and every CTE Consumer. Slices are delimited by Motion -// nodes: each non-scalar child of a Motion lives in a fresh slice -- -// same motId-stack idea as in apply_shareinput_xslice. +// Collect the CTE ids of every CTE Consumer and CTE Producer found beneath the +// given expression. Scalar subtrees are skipped: a Consumer inside a scalar +// subquery runs in its own SubPlan slice and is repaired separately by the +// local-materialization pass in apply_shareinput_xslice, so it must not be +// treated as living "beneath" the current Motion here. static void -CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice, - ULONG *pNextSlice, CTEInfoArray *prodInfos, - CTEInfoArray *consInfos) +CollectConsumersAndProducers(CExpression *pexpr, CBitSet *pbsConsumers, + CBitSet *pbsProducers) { COperator *pop = pexpr->Pop(); - if (COperator::EopPhysicalCTEProducer == pop->Eopid()) + if (COperator::EopPhysicalCTEConsumer == pop->Eopid()) { - // Producer's distribution comes from its only child -- inspect - // it there. Skip non-replicated Producers; they cannot trigger - // the cross-slice issue we are checking for. - GPOS_ASSERT(1 == pexpr->Arity()); - CExpression *pexprChild = (*pexpr)[0]; - CDrvdPropPlan *pdpplan = - CDrvdPropPlan::Pdpplan(pexprChild->PdpDerive()); - - if (FReplicatedLikeDistribution(pdpplan->Pds()->Edt())) - { - prodInfos->Append(GPOS_NEW(mp) SCTEInfo( - CPhysicalCTEProducer::PopConvert(pop)->UlCTEId(), curSlice)); - } + pbsConsumers->ExchangeSet( + CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId()); } - else if (COperator::EopPhysicalCTEConsumer == pop->Eopid()) + else if (COperator::EopPhysicalCTEProducer == pop->Eopid()) { - // Consumer is a leaf -- record (cteId, curSlice) and let the - // caller decide later, once the whole tree has been walked. - consInfos->Append(GPOS_NEW(mp) SCTEInfo( - CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId(), curSlice)); + pbsProducers->ExchangeSet( + CPhysicalCTEProducer::PopConvert(pop)->UlCTEId()); } - BOOL isMotion = CUtils::FPhysicalMotion(pop); - for (ULONG ul = 0; ul < pexpr->Arity(); ul++) { CExpression *pexprChild = (*pexpr)[ul]; - - // Scalar subtrees (predicates, project elements) never run as - // separate executor groups, so they cannot host a slice. if (pexprChild->Pop()->FScalar()) { continue; } + CollectConsumersAndProducers(pexprChild, pbsConsumers, pbsProducers); + } +} + +// True if some CTE Consumer beneath pexprMotion has no matching CTE Producer +// beneath the same Motion -- i.e. the Consumer reads data produced on the +// other side of the Motion (a different slice). +static BOOL +FHasUnpairedCTEConsumer(CMemoryPool *mp, CExpression *pexprMotion) +{ + CBitSet *pbsConsumers = GPOS_NEW(mp) CBitSet(mp); + CBitSet *pbsProducers = GPOS_NEW(mp) CBitSet(mp); + + CollectConsumersAndProducers(pexprMotion, pbsConsumers, pbsProducers); - // Allocate a fresh slice id for each non-scalar child of a - // Motion; otherwise the child stays in the parent's slice. - ULONG childSlice = curSlice; - if (isMotion) + BOOL fUnpaired = false; + CBitSetIter bsiter(*pbsConsumers); + while (bsiter.Advance()) + { + if (!pbsProducers->Get(bsiter.Bit())) { - (*pNextSlice)++; - childSlice = *pNextSlice; + fUnpaired = true; + break; } - - CollectCTESlices(mp, pexprChild, childSlice, pNextSlice, prodInfos, - consInfos); } + + pbsConsumers->Release(); + pbsProducers->Release(); + + return fUnpaired; } +//--------------------------------------------------------------------------- +// @function: +// CUtils::FHasCrossSliceReplicatedCTEConsumer +// +// @doc: +// Detect a CTE Consumer placed beneath a duplicate-hazard Motion (a +// Motion whose input is strict-replicated / universal) whose Producer is +// on the other side of that Motion. That topology depends on the +// cross-slice shared-scan protocol and hangs at execution. +// +// Unlike a CTE referenced from a scalar subquery -- whose cross-slice +// Consumer is repaired locally by apply_shareinput_xslice -- this +// broadcast/duplicate-hazard topology cannot be repaired, so the caller +// falls back to the Postgres optimizer. +// +// Mirrors greengage 51fe92e: it deliberately does NOT trigger for the +// scalar-subquery case (whose Motions are plain Gather / Redistribute, +// not duplicate-hazard), leaving that case to apply_shareinput_xslice. +//--------------------------------------------------------------------------- BOOL CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr) { @@ -992,35 +991,22 @@ CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr) return false; } - CTEInfoArray *prodInfos = GPOS_NEW(mp) CTEInfoArray(mp); - CTEInfoArray *consInfos = GPOS_NEW(mp) CTEInfoArray(mp); - ULONG nextSlice = 0; - - CollectCTESlices(mp, pexpr, 0 /*curSlice*/, &nextSlice, prodInfos, - consInfos); - - BOOL cross = false; - - for (ULONG ic = 0; ic < consInfos->Size(); ic++) + if (CUtils::FPhysicalMotion(pexpr->Pop()) && + CUtils::FDuplicateHazardMotion(pexpr) && + FHasUnpairedCTEConsumer(mp, pexpr)) { - SCTEInfo *cons = (*consInfos)[ic]; + return true; + } - for (ULONG ip = 0; ip < prodInfos->Size(); ip++) + for (ULONG ul = 0; ul < pexpr->Arity(); ul++) + { + if (FHasCrossSliceReplicatedCTEConsumer(mp, (*pexpr)[ul])) { - SCTEInfo *prod = (*prodInfos)[ip]; - if (prod->cteId == cons->cteId && prod->sliceId != cons->sliceId) - { - cross = true; - goto lExit; - } + return true; } } -lExit: - prodInfos->Release(); - consInfos->Release(); - - return cross; + return false; } //--------------------------------------------------------------------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index b609b3406c2..3d2b976056d 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -96,6 +96,30 @@ typedef struct ApplyShareInputContext int *sliceMarks; /* one for each producer */ int producer_count; + int *subplan_motids; /* motId for each subplan, indexed by plan_id-1 */ + int num_subplans; + + bool walking_subplan; /* true when walking a SubPlan tree */ + + /* + * Track already-inlined cross-slice producers so that a second consumer + * of the same CTE in the same slice can share the inlined copy instead + * of creating yet another independent scan. + * + * Each entry maps (orig_share_id, motId) → new_share_id. + */ + int *inlined_orig_ids; /* original share_id */ + int *inlined_mot_ids; /* slice (motId) where it was inlined */ + int *inlined_new_ids; /* new share_id of the inlined producer */ + int inlined_count; + + /* + * Consumer reference counts for original producers, used to detect + * orphaned producers after inlining (those with zero remaining consumers). + */ + int *consumer_counts; /* consumer count per producer */ + int orig_producer_count; /* producer_count before inlining */ + } ApplyShareInputContext; diff --git a/src/test/regress/expected/shared_scan.out b/src/test/regress/expected/shared_scan.out index 1bb16e8a465..2c7b255257d 100644 --- a/src/test/regress/expected/shared_scan.out +++ b/src/test/regress/expected/shared_scan.out @@ -250,8 +250,12 @@ where Optimizer: Postgres query optimizer (52 rows) --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -263,6 +267,42 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; + QUERY PLAN +-------------------------------------------------- + Limit + InitPlan 1 (returns $0) (slice7) + -> Gather Motion 1:1 (slice2; segments: 1) + -> Seq Scan on ss_t2 + Filter: (id = 1) + InitPlan 2 (returns $1) (slice8) + -> Gather Motion 1:1 (slice3; segments: 1) + -> Seq Scan on ss_t2 ss_t2_1 + Filter: (id = 2) + InitPlan 3 (returns $2) (slice9) + -> Gather Motion 1:1 (slice4; segments: 1) + -> Seq Scan on ss_t2 ss_t2_2 + Filter: (id = 1) + InitPlan 4 (returns $3) (slice6) + -> Gather Motion 1:1 (slice5; segments: 1) + -> Seq Scan on ss_t2 ss_t2_3 + Filter: (id = 2) + -> Gather Motion 3:1 (slice1; segments: 3) + -> Limit + -> Seq Scan on ss_t1 + Optimizer: Postgres query optimizer +(21 rows) + +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), diff --git a/src/test/regress/expected/shared_scan_optimizer.out b/src/test/regress/expected/shared_scan_optimizer.out index 56919a5fcb4..f54ea436c15 100644 --- a/src/test/regress/expected/shared_scan_optimizer.out +++ b/src/test/regress/expected/shared_scan_optimizer.out @@ -263,8 +263,12 @@ where Optimizer: Postgres query optimizer (52 rows) --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -276,6 +280,44 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; + QUERY PLAN +----------------------------------------------------------------------------------- + Gather Motion 3:1 (slice3; segments: 3) + -> Sequence + -> Redistribute Motion 1:3 (slice2) + -> Limit + -> Gather Motion 3:1 (slice1; segments: 3) + -> Limit + -> Result + -> Seq Scan on ss_t1 + SubPlan 1 (slice1; segments: 3) + -> Shared Scan (share slice:id 1:2) + -> Materialize + -> Seq Scan on ss_t2 ss_t2_1 + Filter: (id = 1) + SubPlan 2 (slice1; segments: 3) + -> Shared Scan (share slice:id 1:3) + -> Materialize + -> Seq Scan on ss_t2 + Filter: (id = 2) + SubPlan 3 (slice1; segments: 3) + -> Shared Scan (share slice:id 1:2) + SubPlan 4 (slice1; segments: 3) + -> Shared Scan (share slice:id 1:3) + Optimizer: Pivotal Optimizer (GPORCA) +(23 rows) + +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), diff --git a/src/test/regress/sql/shared_scan.sql b/src/test/regress/sql/shared_scan.sql index df2d21faf2d..14250b4bcbd 100644 --- a/src/test/regress/sql/shared_scan.sql +++ b/src/test/regress/sql/shared_scan.sql @@ -125,8 +125,12 @@ where and (stat.schema_name || '.' ||stat.table_name not in (select table_nm_onl_act from tbls_w_onl_actl_data)) or (stat.schema_name || '.' ||stat.table_name in (select table_nm_onl_act from tbls_w_onl_actl_data)); --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -138,7 +142,17 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; - +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1),