From f54161bd450a6cde083e1b287f7e3346cb88e63d Mon Sep 17 00:00:00 2001 From: zhao <873988212@qq.com> Date: Mon, 22 Sep 2025 22:32:48 +0800 Subject: [PATCH 1/3] Optimize motion timeout transmission (#1365) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * The primary goal is to address various issues currently encountered during concurrent processes, such as excessive motion retries, congestion, retransmission storms, and network skew. The code addresses inefficient network retransmission handling in unreliable network environments. Specifically: Fixed Timeout Thresholds: Traditional TCP-style Retransmission Timeout (RTTVAR.RTO) calculations may be too rigid for networks with volatile latency (e.g., satellite links, wireless networks). This leads to: • Premature Retransmissions: Unnecessary data resends during temporary latency spikes, wasting bandwidth. • Delayed Recovery: Slow reaction to actual packet loss when RTO is overly conservative. Lack of Context Awareness: Static RTO ignores real-time network behavior patterns, reducing throughput and responsiveness. Solution: Dynamic Timeout Threshold Adjustment Implements an adaptive timeout mechanism to optimize retransmission: if (now < (curBuf->sentTime + conn->rttvar.rto)) { uint32_t diff = (curBuf->sentTime + conn->rttvar.rto) - now; // ... (statistical tracking and threshold adjustment) } Key Components: • Statistical Tracking: \- min/max: Tracks observed minimum/maximum residual time (time left until RTO expiry). \- retrans_count/no_retrans_count: Counts retransmission vs. non-retransmission events. • Weighted Threshold Calculation: unack_queue_ring.time_difference = (uint32_t)( unack_queue_ring.max * weight_no_retrans + unack_queue_ring.min * weight_retrans ); Weights derived from historical ratios of retransmissions (weight_retrans) vs. successful deliveries (weight_no_retrans). How It Solves the Problem: • Temporary Latency Spike: Uses max (conservative) to avoid false retransmits, reducing bandwidth waste (vs. traditional mistaken retransmissions). • Persistent Packet Loss: Prioritizes min (aggressive) via weight_retrans, accelerating recovery (vs. slow fixed-RTO reaction). • Stable Network: Balances weights for equilibrium throughput (vs. static RTO limitations). EstimateRTT - Dynamically estimates the Round-Trip Time (RTT) and adjusts Retransmission Timeout (RTO) This function implements a variant of the Jacobson/Karels algorithm for RTT estimation, adapted for UDP-based motion control connections. It updates smoothed RTT (srtt), mean deviation (mdev), and RTO values based on newly measured RTT samples (mrtt). The RTO calculation ensures reliable data transmission over unreliable networks. Key Components: * srtt: Smoothed Round-Trip Time (weighted average of historical RTT samples) * mdev: Mean Deviation (measure of RTT variability) * rttvar: Adaptive RTT variation bound (used to clamp RTO updates) * rto: Retransmission Timeout (dynamically adjusted based on srtt + rttvar) Algorithm Details: 1. For the first RTT sample: srtt = mrtt << 3 (scaled by 8 for fixed-point arithmetic) mdev = mrtt << 1 (scaled by 2) rttvar = max(mdev, rto_min) 2. For subsequent samples: Delta = mrtt - (srtt >> 3) (difference between new sample and smoothed RTT) srtt += Delta (update srtt with 1/8 weight of new sample) Delta = abs(Delta) - (mdev >> 2) mdev += Delta (update mdev with 1/4 weight) 3. rttvar bounds the maximum RTT variation: If mdev > mdev_max, update mdev_max and rttvar On new ACKs (snd_una > rtt_seq), decay rttvar toward mdev\_max 4. Final RTO calculation: rto = (srtt >> 3) + rttvar (clamped to RTO_MAX) Network Latency Filtering and RTO Optimization This logic mitigates RTO distortion caused by non-network delays in database execution pipelines. Key challenges addressed: * Operator processing delays (non-I/O wait) inflate observed ACK times * Spurious latency amplification in lossy networks triggers excessive RTO_MAX waits * Congestion collapse from synchronized retransmissions Core Mechanisms: 1. Valid RTT Sampling Filter: Condition: 4 * (pkt->recv_time - pkt->send_time) > ackTime && pkt->retry_times != Gp_interconnect_min_retries_before_timeout Rationale: * Filters packets exceeding 2x expected round-trips (4x one-way) * Excludes artificial retries (retry_times=Gp_interconnect_min_retries_before_timeout) to avoid sampling bias Action: Update RTT estimation only with valid samples via EstimateRTT() 2. Randomized Backoff: Condition: buf->nRetry > 0 Algorithm: rto += (rto >> (4 * buf->nRetry)) Benefits: * Exponential decay: Shifts create geometrically decreasing increments * Connection-specific randomization: Prevents global synchronization * Dynamic scaling: Adapts to retry depth (nRetry) 3. Timer List Management (NEW_TIMER): Operations: RemoveFromRTOList(&mudp, bufConn) → Detaches from monitoring AddtoRTOList(\&mudp, bufConn) → Reinserts with updated rto Purpose: Maintains real-time ordering of expiration checks We conducted multiple full-scale TPCDS benchmarks using both a single physical machine with 48 nodes and four physical machines with 96 nodes, testing with MTU values of 1500 and 9000. In the single-node environment with no network bottlenecks, there were no significant performance differences between using MTU 1500 and 9000. In the 96-node environment, under single-threaded execution, there were no significant performance differences. However, under multi-threaded execution (4 threads), SQL statements with a high percentage of data movement showed significant performance variations, ranging from 5 to 10 times, especially with MTU 1500. * Cleaning up the code --------- Co-authored-by: zhaoxi Co-authored-by: zhaoxi --- contrib/interconnect/ic_internal.h | 52 ++ contrib/interconnect/test/ic_test_env.c | 2 + contrib/interconnect/udp/ic_udpifc.c | 882 ++++++++++++++++-- contrib/interconnect/udp/ic_udpifc.h | 3 + src/backend/cdb/cdbvars.c | 1 + src/backend/utils/misc/guc_gp.c | 12 + src/include/cdb/cdbvars.h | 3 + src/include/utils/sync_guc_name.h | 1 + .../icudp/gp_interconnect_fc_method.out | 46 + .../queue_depth_combination_capacity.out | 257 +++++ .../queue_depth_combination_loss_advance.out | 260 ++++++ src/test/regress/greenplum_schedule | 2 +- src/test/regress/icudp_schedule | 2 +- .../sql/icudp/gp_interconnect_fc_method.sql | 8 + .../queue_depth_combination_capacity.sql | 55 ++ .../queue_depth_combination_loss_advance.sql | 65 ++ 16 files changed, 1584 insertions(+), 67 deletions(-) create mode 100644 src/test/regress/expected/icudp/queue_depth_combination_loss_advance.out create mode 100644 src/test/regress/sql/icudp/queue_depth_combination_loss_advance.sql diff --git a/contrib/interconnect/ic_internal.h b/contrib/interconnect/ic_internal.h index 77c08ee6e29..8557115cd33 100644 --- a/contrib/interconnect/ic_internal.h +++ b/contrib/interconnect/ic_internal.h @@ -10,6 +10,11 @@ */ #ifndef INTER_CONNECT_INTERNAL_H #define INTER_CONNECT_INTERNAL_H +#include +#include +#include +#include +#include #include "tcp/ic_tcp.h" #include "udp/ic_udpifc.h" @@ -33,6 +38,27 @@ typedef enum MotionConnState mcsEosSent } MotionConnState; +struct udp_send_vars +{ + /* send sequence variables */ + uint32_t snd_una; /* send unacknoledged */ + uint32_t snd_wnd; /* send window (unscaled) */ + + /* retransmission timeout variables */ + uint8_t nrtx; /* number of retransmission */ + uint8_t max_nrtx; /* max number of retransmission */ + uint32_t rto; /* retransmission timeout */ + uint32_t ts_rto; /* timestamp for retransmission timeout */ + + /* congestion control variables */ + uint32_t cwnd; /* congestion window */ + uint32_t ssthresh; /* slow start threshold */ + + TAILQ_ENTRY(MotionConnUDP) send_link; + TAILQ_ENTRY(MotionConnUDP) timer_link; /* timer link (rto list) */ + +}; + /* * Structure used for keeping track of a pt-to-pt connection between two * Cdb Entities (either QE or QD). @@ -153,6 +179,32 @@ typedef struct MotionConnUDP uint64 stat_count_resent; uint64 stat_max_resent; uint64 stat_count_dropped; + + struct { + uint32_t ts_rto; + uint32_t rto; + uint32_t srtt; + uint32_t rttvar; + uint32_t snd_una; + uint16_t nrtx; + uint16_t max_nrtx; + uint32_t mss; + uint32_t cwnd; + uint32_t ssthresh; + uint32_t fss; + uint8_t loss_count; + uint32_t mdev; + uint32_t mdev_max; + uint32_t rtt_seq; /* sequence number to update rttvar */ + uint32_t ts_all_rto; + bool karn_mode; + } rttvar; + + uint8_t on_timewait_list; + int16_t on_rto_idx; + + uint32_t snd_nxt; /* send next */ + struct udp_send_vars sndvar; } MotionConnUDP; typedef struct MotionConnTCP diff --git a/contrib/interconnect/test/ic_test_env.c b/contrib/interconnect/test/ic_test_env.c index 5333a143de5..1c9f2d0ce05 100644 --- a/contrib/interconnect/test/ic_test_env.c +++ b/contrib/interconnect/test/ic_test_env.c @@ -330,6 +330,7 @@ client_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_p Gp_interconnect_queue_depth = 800; Gp_interconnect_snd_queue_depth = 600; + Gp_interconnect_mem_size = 20; Gp_interconnect_timer_period = 1; Gp_interconnect_timer_checking_period = 2; InitializeLatchSupport(); @@ -374,6 +375,7 @@ server_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_p Gp_interconnect_queue_depth = 800; Gp_interconnect_snd_queue_depth = 600; + Gp_interconnect_mem_size = 20; Gp_interconnect_timer_period = 1; Gp_interconnect_timer_checking_period = 2; InitializeLatchSupport(); diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index 63e8c9301dd..eeda2c678bc 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -26,13 +26,17 @@ #include "ic_udpifc.h" #include "ic_internal.h" #include "ic_common.h" - #include #include #include #include #include #include +#include +#include +#include +#include +#include #include "access/transam.h" #include "access/xact.h" @@ -116,6 +120,57 @@ WSAPoll( #undef select #endif +#define TIMEOUT_Z +#define RTT_SHIFT_ALPHA (3) /* srtt (0.125) */ +#define LOSS_THRESH (3) /* Packet loss triggers Karn */ +#define RTO_MIN (5000) /* MIN RTO(ms) */ +#define RTO_MAX (100000) /* MAX RTO(ms) */ +#define UDP_INFINITE_SSTHRESH 0x7fffffff + +#define SEC_TO_USEC(t) ((t) * 1000000) +#define SEC_TO_MSEC(t) ((t) * 1000) +#define MSEC_TO_USEC(t) ((t) * 1000) +#define USEC_TO_SEC(t) ((t) / 1000000) +#define TIME_TICK (1000000/HZ)/* in us */ + +#define UDP_INITIAL_RTO (MSEC_TO_USEC(200)) +#define UDP_DEFAULT_MSS 1460 + +#define RTO_HASH (3000) + +#define UDP_SEQ_LT(a,b) ((int32_t)((a)-(b)) < 0) +#define UDP_SEQ_LEQ(a,b) ((int32_t)((a)-(b)) <= 0) +#define UDP_SEQ_GT(a,b) ((int32_t)((a)-(b)) > 0) +#define UDP_SEQ_GEQ(a,b) ((int32_t)((a)-(b)) >= 0) + +#ifndef MAX +#define MAX(a, b) ((a)>(b)?(a):(b)) +#endif +#ifndef MIN +#define MIN(a, b) ((a)<(b)?(a):(b)) +#endif + +#define UDP_RTO_MIN ((unsigned)(HZ/5)) + +struct rto_hashstore +{ + uint32_t rto_now_idx; /* pointing the hs_table_s index */ + uint32_t rto_now_ts; + + TAILQ_HEAD(rto_head, MotionConnUDP) rto_list[RTO_HASH + 1]; +}; + +struct mudp_manager +{ + struct rto_hashstore *rto_store; /* lists related to timeout */ + + int rto_list_cnt; + uint32_t cur_ts; +}; + +typedef struct mudp_manager* mudp_manager_t; +static struct mudp_manager mudp; + #define MAX_TRY (11) int timeoutArray[] = @@ -516,8 +571,10 @@ static ICGlobalControlInfo ic_control_info; */ #define UNACK_QUEUE_RING_SLOTS_NUM (2000) #define TIMER_SPAN (Gp_interconnect_timer_period * 1000ULL) /* default: 5ms */ -#define TIMER_CHECKING_PERIOD (Gp_interconnect_timer_checking_period) /* default: 20ms */ +#define TIMER_SPAN_LOSS (Gp_interconnect_timer_period * 500ULL) /* default: 5ms */ +#define TIMER_CHECKING_PERIOD Gp_interconnect_timer_checking_period /* default: 20ms */ #define UNACK_QUEUE_RING_LENGTH (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN) +#define UNACK_QUEUE_RING_LENGTH_LOSS (UNACK_QUEUE_RING_SLOTS_NUM * TIMER_SPAN_LOSS) #define DEFAULT_RTT (Gp_interconnect_default_rtt * 1000) /* default: 20ms */ #define MIN_RTT (100) /* 0.1ms */ @@ -537,6 +594,7 @@ static ICGlobalControlInfo ic_control_info; #define MAX_SEQS_IN_DISORDER_ACK (4) +#define MAX_QUEUE_SIZE (64) /* * UnackQueueRing * @@ -573,12 +631,19 @@ struct UnackQueueRing /* time slots */ ICBufferList slots[UNACK_QUEUE_RING_SLOTS_NUM]; +#ifdef TIMEOUT_Z + uint32_t retrans_count; + uint32_t no_retrans_count; + uint32_t time_difference; + uint32_t min; + uint32_t max; +#endif }; /* * All connections in a process share this unack queue ring instance. */ -static UnackQueueRing unack_queue_ring = {0, 0, 0}; +static UnackQueueRing unack_queue_ring = {0}; static int ICSenderSocket = -1; static int32 ICSenderPort = 0; @@ -746,8 +811,8 @@ static void checkQDConnectionAlive(void); static void *rxThreadFunc(void *arg); static bool handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len); -static void handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now); -static bool handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry); +static void handleAckedPacket(MotionConn *ackConn, ICBuffer *buf, uint64 now, struct icpkthdr *pkt); +static bool handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, bool need_flush); static void handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, int16 motionId); static void handleDisorderPacket(MotionConn *conn, int pos, uint32 tailSeq, icpkthdr *pkt); static bool handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param, bool *wakeup_mainthread); @@ -766,6 +831,8 @@ static void initSndBufferPool(); static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now); static void initUnackQueueRing(UnackQueueRing *uqr); +static void initUdpManager(mudp_manager_t mptr); +static inline void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged); static void checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now); static void checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *conn); @@ -924,6 +991,349 @@ dumpTransProtoStats() #endif /* TRANSFER_PROTOCOL_STATS */ +static struct rto_hashstore* +initRTOHashstore() +{ + int i; + struct rto_hashstore* hs = palloc(sizeof(struct rto_hashstore)); + + for (i = 0; i < RTO_HASH; i++) + TAILQ_INIT(&hs->rto_list[i]); + + TAILQ_INIT(&hs->rto_list[RTO_HASH]); + + return hs; +} + +static void +initUdpManager(mudp_manager_t mudp) +{ + mudp->rto_store = initRTOHashstore(); + mudp->rto_list_cnt = 0; + mudp->cur_ts = 0; +} + +static inline void +addtoRTOList(mudp_manager_t mudp, MotionConnUDP *cur_stream) +{ + if (!mudp->rto_list_cnt) + { + mudp->rto_store->rto_now_idx = 0; + mudp->rto_store->rto_now_ts = cur_stream->sndvar.ts_rto; + } + + if (cur_stream->on_rto_idx < 0 ) + { + if (cur_stream->on_timewait_list) + return; + + int diff = (int32_t)(cur_stream->sndvar.ts_rto - mudp->rto_store->rto_now_ts); + if (diff < RTO_HASH) + { + int offset= (diff + mudp->rto_store->rto_now_idx) % RTO_HASH; + cur_stream->on_rto_idx = offset; + TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]), + cur_stream, sndvar.timer_link); + } + else + { + cur_stream->on_rto_idx = RTO_HASH; + TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[RTO_HASH]), + cur_stream, sndvar.timer_link); + } + mudp->rto_list_cnt++; + } +} + +static inline void +removeFromRTOList(mudp_manager_t mudp, + MotionConnUDP *cur_stream) +{ + if (cur_stream->on_rto_idx < 0) + return; + + TAILQ_REMOVE(&mudp->rto_store->rto_list[cur_stream->on_rto_idx], + cur_stream, sndvar.timer_link); + cur_stream->on_rto_idx = -1; + + mudp->rto_list_cnt--; +} + +static inline void +updateRetransmissionTimer(mudp_manager_t mudp, + MotionConnUDP *cur_stream, + uint32_t cur_ts) +{ + cur_stream->sndvar.nrtx = 0; + + /* if in rto list, remove it */ + if (cur_stream->on_rto_idx >= 0) + removeFromRTOList(mudp, cur_stream); + + /* Reset retransmission timeout */ + if (UDP_SEQ_GT(cur_stream->snd_nxt, cur_stream->sndvar.snd_una)) + { + /* there are packets sent but not acked */ + /* update rto timestamp */ + cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto; + addtoRTOList(mudp, cur_stream); + } + + if (cur_stream->on_rto_idx == -1) + { + cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto; + addtoRTOList(mudp, cur_stream); + } +} + +static int +handleRTO(mudp_manager_t mudp, + uint32_t cur_ts, + MotionConnUDP *cur_stream, + ChunkTransportState *transportStates, + ChunkTransportStateEntry *pEntry, + MotionConn *triggerConn) +{ + /* check for expiration */ + int count = 0; + int retransmits = 0; + MotionConnUDP *currBuffConn = NULL; + uint32_t now = cur_ts; + + Assert(unack_queue_ring.currentTime != 0); + removeFromRTOList(mudp, cur_stream); + + while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM) + { + /* expired, need to resend them */ + ICBuffer *curBuf = NULL; + + while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL) + { + curBuf->nRetry++; + putIntoUnackQueueRing( + &unack_queue_ring, + curBuf, + computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now); + +#ifdef TRANSFER_PROTOCOL_STATS + updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt); +#endif + + sendOnce(transportStates, pEntry, curBuf, curBuf->conn); + + currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + + retransmits++; + ic_statistics.retransmits++; + currBuffConn->stat_count_resent++; + currBuffConn->stat_max_resent = Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent); + checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged); + +#ifdef AMS_VERBOSE_LOGGING + write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", + curBuf->pkt->seq, curBuf->nRetry, curBuf->conn->rtt, curBuf->conn->route); + logPkt("RESEND PKT in checkExpiration", curBuf->pkt); +#endif + } + + unack_queue_ring.currentTime += TIMER_SPAN; + unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM); + } + return 0; +} + +static inline void +rearrangeRTOStore(mudp_manager_t mudp) +{ + MotionConnUDP *walk, *next; + struct rto_head* rto_list = &mudp->rto_store->rto_list[RTO_HASH]; + int cnt = 0; + + for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next) + { + next = TAILQ_NEXT(walk, sndvar.timer_link); + + int diff = (int32_t)(mudp->rto_store->rto_now_ts - walk->sndvar.ts_rto); + if (diff < RTO_HASH) + { + int offset = (diff + mudp->rto_store->rto_now_idx) % RTO_HASH; + TAILQ_REMOVE(&mudp->rto_store->rto_list[RTO_HASH], + walk, sndvar.timer_link); + walk->on_rto_idx = offset; + TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]), + walk, sndvar.timer_link); + } + cnt++; + } +} + +static inline void +checkRtmTimeout(mudp_manager_t mudp, + uint32_t cur_ts, + int thresh, + ChunkTransportState *transportStates, + ChunkTransportStateEntry *pEntry, + MotionConn *triggerConn) +{ + MotionConnUDP *walk, *next; + struct rto_head* rto_list; + int cnt; + + if (!mudp->rto_list_cnt) + return; + + cnt = 0; + + while (1) + { + rto_list = &mudp->rto_store->rto_list[mudp->rto_store->rto_now_idx]; + if ((int32_t)(cur_ts - mudp->rto_store->rto_now_ts) < 0) + break; + + for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next) + { + if (++cnt > thresh) + break; + next = TAILQ_NEXT(walk, sndvar.timer_link); + + if (walk->on_rto_idx >= 0) + { + TAILQ_REMOVE(rto_list, walk, sndvar.timer_link); + mudp->rto_list_cnt--; + walk->on_rto_idx = -1; + handleRTO(mudp, cur_ts, walk, transportStates, pEntry, triggerConn); + } + } + + if (cnt > thresh) + { + break; + } + else + { + mudp->rto_store->rto_now_idx = (mudp->rto_store->rto_now_idx + 1) % RTO_HASH; + mudp->rto_store->rto_now_ts++; + if (!(mudp->rto_store->rto_now_idx % 1000)) + rearrangeRTOStore(mudp); + } + + } +} + +/* + * estimateRTT - Dynamically estimates the Round-Trip Time (RTT) and adjusts Retransmission Timeout (RTO) + * + * This function implements a variant of the Jacobson/Karels algorithm for RTT estimation, adapted for UDP-based + * motion control connections. It updates smoothed RTT (srtt), mean deviation (mdev), and RTO values based on + * newly measured RTT samples (mrtt). The RTO calculation ensures reliable data transmission over unreliable networks. + * + * Key Components: + * - srtt: Smoothed Round-Trip Time (weighted average of historical RTT samples) + * - mdev: Mean Deviation (measure of RTT variability) + * - rttvar: Adaptive RTT variation bound (used to clamp RTO updates) + * - rto: Retransmission Timeout (dynamically adjusted based on srtt + rttvar) + * + * Algorithm Details: + * 1. For the first RTT sample: + * srtt = mrtt << 3 (scaled by 8 for fixed-point arithmetic) + * mdev = mrtt << 1 (scaled by 2) + * rttvar = max(mdev, rto_min) + * 2. For subsequent samples: + * Delta = mrtt - (srtt >> 3) (difference between new sample and smoothed RTT) + * srtt += Delta (update srtt with 1/8 weight of new sample) + * Delta = abs(Delta) - (mdev >> 2) + * mdev += Delta (update mdev with 1/4 weight) + * 3. rttvar bounds the maximum RTT variation: + * If mdev > mdev_max, update mdev_max and rttvar + * On new ACKs (snd_una > rtt_seq), decay rttvar toward mdev_max + * 4. Final RTO calculation: + * rto = (srtt >> 3) + rttvar (clamped to RTO_MAX) + * + * Parameters: + * @mConn: Parent motion connection context (container of MotionConnUDP) + * @mrtt: Measured Round-Trip Time (in microseconds) for the latest packet + * + * Notes: + * - Designed for non-retransmitted packets to avoid sampling bias. + * - Uses fixed-point arithmetic to avoid floating-point operations. + * - Minimum RTO (rto_min) is set to 20ms (HZ/5/10, assuming HZ=100). + * - Critical for adaptive timeout control in UDP protocols where reliability is implemented at the application layer. + * - Thread-unsafe: Must be called in a synchronized context (e.g., packet processing loop). + */ + +static inline void +estimateRTT(MotionConn *mConn , uint32_t mrtt) +{ + /* This function should be called for not retransmitted packets */ + /* TODO: determine rto_min */ + MotionConnUDP *conn = NULL; + + conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); + long m = mrtt; + uint32_t rto_min = UDP_RTO_MIN / 10; + + if (m == 0) + m = 1; + + /* + * Special RTO optimization for high-speed networks: + * When measured RTT (m) is below 100 microseconds and current RTO is under 10ms, + * forcibly set RTO to half of RTO_MIN. This targets two scenarios: + * - Loopback interfaces (localhost communication) + * - Ultra-low-latency networks (e.g., InfiniBand, RDMA) + */ + if(m < 100 && conn->rttvar.rto < 10000) + { + conn->rttvar.rto = RTO_MIN / 2; + } + + if (conn->rttvar.srtt != 0) + { + /* rtt = 7/8 rtt + 1/8 new */ + m -= (conn->rttvar.srtt >> LOSS_THRESH); + conn->rttvar.srtt += m; + if (m < 0) + { + m = -m; + m -= (conn->rttvar.mdev >> RTT_SHIFT_ALPHA); + if (m > 0) + m >>= LOSS_THRESH; + } + else + { + m -= (conn->rttvar.mdev >> RTT_SHIFT_ALPHA); + } + conn->rttvar.mdev += m; + if (conn->rttvar.mdev > conn->rttvar.mdev_max) + { + conn->rttvar.mdev_max = conn->rttvar.mdev; + if (conn->rttvar.mdev_max > conn->rttvar.rttvar) + { + conn->rttvar.rttvar = conn->rttvar.mdev_max; + } + } + if (UDP_SEQ_GT(conn->rttvar.snd_una, conn->rttvar.rtt_seq)) + { + if (conn->rttvar.mdev_max < conn->rttvar.rttvar) + { + conn->rttvar.rttvar -= (conn->rttvar.rttvar - conn->rttvar.mdev_max) >> RTT_SHIFT_ALPHA; + } + conn->rttvar.mdev_max = rto_min; + } + } + else + { + /* fresh measurement */ + conn->rttvar.srtt = m << LOSS_THRESH; + conn->rttvar.mdev = m << 1; + conn->rttvar.mdev_max = conn->rttvar.rttvar = MAX(conn->rttvar.mdev, rto_min); + } + + conn->rttvar.rto = ((conn->rttvar.srtt >> LOSS_THRESH) + conn->rttvar.rttvar) > RTO_MAX ? RTO_MAX : ((conn->rttvar.srtt >> LOSS_THRESH) + conn->rttvar.rttvar); +} + + /* * initCursorICHistoryTable * Initialize cursor ic history table. @@ -2522,6 +2932,14 @@ initUnackQueueRing(UnackQueueRing *uqr) { icBufferListInit(&uqr->slots[i], ICBufferListType_Secondary); } + +#ifdef TIMEOUT_Z + uqr->retrans_count = 0; + uqr->no_retrans_count = 0; + uqr->time_difference = 0; + uqr->min = 0; + uqr->max = 0; +#endif } /* @@ -2556,6 +2974,9 @@ computeExpirationPeriod(MotionConn *mConn, uint32 retry) else #endif { + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) + return Min(retry > 3 ? conn->rttvar.rto * retry : conn->rttvar.rto, UNACK_QUEUE_RING_LENGTH_LOSS); + uint32 factor = (retry <= 12 ? retry : 12); return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD, (conn->rtt + (conn->dev << 2)) << (factor))); @@ -2968,6 +3389,19 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS conn->mConn.msgSize = sizeof(conn->conn_info); conn->mConn.stillActive = true; conn->conn_info.seq = 1; + conn->rttvar.ts_rto = 0; + conn->rttvar.rto = UDP_INITIAL_RTO; + conn->rttvar.srtt = 0; + conn->rttvar.rttvar = 0; + conn->rttvar.snd_una = 0; + conn->rttvar.nrtx = 0; + conn->rttvar.max_nrtx = 0; + conn->rttvar.mss = UDP_DEFAULT_MSS; + conn->rttvar.cwnd = 2; + conn->rttvar.ssthresh = UDP_INFINITE_SSTHRESH; + conn->rttvar.loss_count = 0; + conn->rttvar.karn_mode = false; + conn->on_rto_idx = -1; Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6); } /* setupOutgoingUDPConnection */ @@ -3207,6 +3641,19 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) conn->conn_info.icId = sliceTable->ic_instance_id; conn->conn_info.flags = UDPIC_FLAGS_RECEIVER_TO_SENDER; + conn->rttvar.ts_rto = 0; + conn->rttvar.rto = UDP_INITIAL_RTO; + conn->rttvar.srtt = 0; + conn->rttvar.rttvar = 0; + conn->rttvar.snd_una = 0; + conn->rttvar.nrtx = 0; + conn->rttvar.max_nrtx = 0; + conn->rttvar.mss = UDP_DEFAULT_MSS; + conn->rttvar.cwnd = 2; + conn->rttvar.ssthresh = UDP_INFINITE_SSTHRESH; + conn->rttvar.loss_count = 0; + conn->rttvar.karn_mode = false; + conn->on_rto_idx = -1; connAddHash(&ic_control_info.connHtab, &conn->mConn); } } @@ -3221,6 +3668,8 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) { initSndBufferPool(&snd_buffer_pool); initUnackQueueRing(&unack_queue_ring); + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER) + initUdpManager(&mudp); ic_control_info.isSender = true; ic_control_info.lastExpirationCheckTime = getCurrentTime(); ic_control_info.lastPacketSendTime = ic_control_info.lastExpirationCheckTime; @@ -3284,6 +3733,9 @@ static inline void SetupUDPIFCInterconnect(EState *estate) { ChunkTransportState *icContext = NULL; + int32 sliceNum = 0; + int32 calcQueueDepth = 0; + int32 calcSndDepth = 0; PG_TRY(); { /* @@ -3291,6 +3743,39 @@ SetupUDPIFCInterconnect(EState *estate) * technically it is not part of current query, discard it directly. */ resetRxThreadError(); + if (estate != NULL && estate->es_sliceTable != NULL) + sliceNum = estate->es_sliceTable->numSlices; + else + sliceNum = 1; + + if (Gp_interconnect_mem_size > 0 && + Gp_interconnect_queue_depth == 4 && + Gp_interconnect_snd_queue_depth == 2) + { + int32 perQueue = Gp_interconnect_mem_size / + (Gp_max_packet_size * sliceNum); + + calcSndDepth = Max(Gp_interconnect_snd_queue_depth, perQueue / 2); + calcQueueDepth = Max(Gp_interconnect_queue_depth, perQueue - calcSndDepth); + + if (calcSndDepth > MAX_QUEUE_SIZE) + calcSndDepth = MAX_QUEUE_SIZE; + + if (calcQueueDepth > MAX_QUEUE_SIZE) + calcQueueDepth = MAX_QUEUE_SIZE; + + Gp_interconnect_snd_queue_depth = calcSndDepth; + Gp_interconnect_queue_depth = calcQueueDepth; + + elog(DEBUG1, "SetupUDPIFCInterconnect: queue depth, " + "queue_depth=%d, snd_queue_depth=%d, " + "mem_size=%d, slices=%d, packet_size=%d", + Gp_interconnect_queue_depth, + Gp_interconnect_snd_queue_depth, + Gp_interconnect_mem_size, + sliceNum, + Gp_max_packet_size); + } icContext = SetupUDPIFCInterconnect_Internal(estate->es_sliceTable); @@ -3815,7 +4300,6 @@ static TupleChunkListItem receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry, int16 motNodeID, int16 *srcRoute, MotionConn *mConn) { - bool directed = false; int nFds = 0; int *waitFds = NULL; int nevent = 0; @@ -3832,7 +4316,6 @@ receiveChunksUDPIFC(ChunkTransportState *pTransportStates, ChunkTransportStateEn if (mConn != NULL) { conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); - directed = true; *srcRoute = conn->route; setMainThreadWaiting(&rx_control_info.mainWaitingState, motNodeID, conn->route, pTransportStates->sliceTable->ic_instance_id); @@ -4472,7 +4955,7 @@ logPkt(char *prefix, icpkthdr *pkt) * packet is retransmitted. */ static void -handleAckedPacket(MotionConn *ackMotionConn, ICBuffer *buf, uint64 now) +handleAckedPacket(MotionConn *ackMotionConn, ICBuffer *buf, uint64 now, struct icpkthdr *pkt) { uint64 ackTime = 0; bool bufIsHead = false; @@ -4485,6 +4968,39 @@ handleAckedPacket(MotionConn *ackMotionConn, ICBuffer *buf, uint64 now) buf = icBufferListDelete(&ackConn->unackQueue, buf); + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER) + { + bufConn = CONTAINER_OF(buf->conn, MotionConnUDP, mConn); + buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf); + unack_queue_ring.numOutStanding--; + if (icBufferListLength(&ackConn->unackQueue) >= 1) + unack_queue_ring.numSharedOutStanding--; + + ackTime = now - buf->sentTime; + + if (buf->nRetry == 0) + { + /* adjust the congestion control window. */ + if (snd_control_info.cwnd < snd_control_info.ssthresh) + snd_control_info.cwnd += 2; + else + snd_control_info.cwnd += 1 / snd_control_info.cwnd; + snd_control_info.cwnd = Min(snd_control_info.cwnd, snd_buffer_pool.maxCount); + } + + if ((bufConn->rttvar.rto << 1) > ackTime && pkt->retry_times != Gp_interconnect_min_retries_before_timeout) + estimateRTT(buf->conn, (now - pkt->send_time)); + + if (buf->nRetry && pkt->retry_times > 0 && pkt->retry_times < Gp_interconnect_min_retries_before_timeout) + bufConn->rttvar.rto += (bufConn->rttvar.rto >> 4 * buf->nRetry); + + if (unlikely(Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER)) + { + bufConn->sndvar.ts_rto = bufConn->rttvar.rto; + addtoRTOList(&mudp, bufConn); + } + } + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) { buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf); @@ -4564,7 +5080,7 @@ handleAckedPacket(MotionConn *ackMotionConn, ICBuffer *buf, uint64 now) * if we receive a stop message, return true (caller will clean up). */ static bool -handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry) +handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, bool need_flush) { ChunkTransportStateEntryUDP * pEntry = NULL; bool ret = false; @@ -4577,7 +5093,6 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun struct icpkthdr *pkt = snd_control_info.ackBuffer; - bool shouldSendBuffers = false; SliceTable *sliceTbl = transportStates->sliceTable; @@ -4751,7 +5266,7 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun while (!icBufferListIsHead(&ackConn->unackQueue, link) && buf->pkt->seq <= pkt->seq) { next = link->next; - handleAckedPacket(&ackConn->mConn, buf, now); + handleAckedPacket(&ackConn->mConn, buf, now, pkt); shouldSendBuffers = true; link = next; buf = GET_ICBUFFER_FROM_PRIMARY(link); @@ -4767,7 +5282,7 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun * still send here, since in STOP/EOS race case, we may have been * in EOS sending logic and will not check stop message. */ - if (shouldSendBuffers) + if (shouldSendBuffers && need_flush) sendBuffers(transportStates, &pEntry->entry, &ackConn->mConn); } else if (DEBUG1 >= log_min_messages) @@ -5011,7 +5526,7 @@ handleStopMsgs(ChunkTransportState *transportStates, ChunkTransportStateEntry *p { if (pollAcks(transportStates, pEntry->txfd, 0)) { - if (handleAcks(transportStates, &pEntry->entry)) + if (handleAcks(transportStates, &pEntry->entry, true)) { /* more stops found, loop again. */ i = 0; @@ -5053,7 +5568,7 @@ sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEnt { ICBuffer *buf = NULL; - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS && + if ((Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) && (icBufferListLength(&conn->unackQueue) > 0 && unack_queue_ring.numSharedOutStanding >= (snd_control_info.cwnd - snd_control_info.minCwnd))) break; @@ -5074,7 +5589,7 @@ sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEnt icBufferListAppend(&conn->unackQueue, buf); - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { unack_queue_ring.numOutStanding++; if (icBufferListLength(&conn->unackQueue) > 1) @@ -5098,6 +5613,10 @@ sendBuffers(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEnt updateStats(TPE_DATA_PKT_SEND, conn, buf->pkt); #endif + struct icpkthdr *pkt_ = buf->pkt; + pkt_->send_time = now; + pkt_->recv_time = 0; + pkt_->retry_times = buf->nRetry; sendOnce(transportStates, pEntry, buf, &conn->mConn); ic_statistics.sndPktNum++; @@ -5245,7 +5764,7 @@ handleAckForDisorderPkt(ChunkTransportState *transportStates, if (buf->pkt->seq == pkt->seq) { - handleAckedPacket(&conn->mConn, buf, now); + handleAckedPacket(&conn->mConn, buf, now, pkt); shouldSendBuffers = true; break; } @@ -5255,7 +5774,7 @@ handleAckForDisorderPkt(ChunkTransportState *transportStates, /* this is a lost packet, retransmit */ buf->nRetry++; - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { buf = icBufferListDelete(&unack_queue_ring.slots[buf->unackQueueRingSlot], buf); putIntoUnackQueueRing(&unack_queue_ring, buf, @@ -5284,7 +5803,7 @@ handleAckForDisorderPkt(ChunkTransportState *transportStates, /* remove packet already received. */ next = link->next; - handleAckedPacket(&conn->mConn, buf, now); + handleAckedPacket(&conn->mConn, buf, now, pkt); shouldSendBuffers = true; link = next; buf = GET_ICBUFFER_FROM_PRIMARY(link); @@ -5301,7 +5820,7 @@ handleAckForDisorderPkt(ChunkTransportState *transportStates, lostPktCnt--; } } - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, snd_control_info.minCwnd); snd_control_info.cwnd = snd_control_info.ssthresh; @@ -5354,7 +5873,7 @@ handleAckForDuplicatePkt(MotionConn *mConn, icpkthdr *pkt) while (!icBufferListIsHead(&conn->unackQueue, link) && (buf->pkt->seq <= pkt->extraSeq)) { next = link->next; - handleAckedPacket(&conn->mConn, buf, now); + handleAckedPacket(&conn->mConn, buf, now, pkt); shouldSendBuffers = true; link = next; buf = GET_ICBUFFER_FROM_PRIMARY(link); @@ -5366,7 +5885,7 @@ handleAckForDuplicatePkt(MotionConn *mConn, icpkthdr *pkt) next = link->next; if (buf->pkt->seq == pkt->seq) { - handleAckedPacket(&conn->mConn, buf, now); + handleAckedPacket(&conn->mConn, buf, now, pkt); shouldSendBuffers = true; break; } @@ -5448,55 +5967,223 @@ checkExpiration(ChunkTransportState *transportStates, uint64 now) { /* check for expiration */ - int count = 0; - int retransmits = 0; + int count = 0; + int retransmits = 0; MotionConnUDP *currBuffConn = NULL; Assert(unack_queue_ring.currentTime != 0); - while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM) + + if (unlikely(Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER)) { - /* expired, need to resend them */ - ICBuffer *curBuf = NULL; + checkRtmTimeout(&mudp, now, 500, transportStates, pEntry, triggerConn); + return; + } - while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) + { + uint64 timer_span_time = unack_queue_ring.currentTime + TIMER_SPAN_LOSS; + + while (now >= (timer_span_time + unack_queue_ring.time_difference) && count++ < UNACK_QUEUE_RING_SLOTS_NUM) { - curBuf->nRetry++; - putIntoUnackQueueRing( - &unack_queue_ring, - curBuf, - computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now); + /* expired, need to resend them */ + ICBuffer *curBuf = NULL; + + while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL) + { + MotionConnUDP *conn = NULL; + conn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + curBuf->nRetry++; + + /* + * Fixed Timeout Thresholds: Traditional TCP-style Retransmission Timeout + * (RTTVAR.RTO) calculations may be too rigid for networks with volatile + * latency. This leads to: + * Premature Retransmissions: Unnecessary data resends during temporary + * latency spikes, wasting bandwidth. + * Delayed Recovery: Slow reaction to actual packet loss when RTO is + * overly conservative. + * + * Lack of Context Awareness: Static RTO ignores real-time network behavior + * patterns, reducing throughput and responsiveness. + * + * Solution: Dynamic Timeout Threshold Adjustment + * Implements an adaptive timeout mechanism to optimize retransmission: + * if (now < (curBuf->sentTime + conn->rttvar.rto)) { + * uint32_t diff = (curBuf->sentTime + conn->rttvar.rto) - now; + * // ... (statistical tracking and threshold adjustment) + * } + * Temporary Latency Spike: Uses max (conservative) to avoid false + * retransmits, reducing bandwidth waste (vs. traditional mistaken + * retransmissions). + * Persistent Packet Loss: Prioritizes min (aggressive) via + * weight_retrans, accelerating recovery (vs. slow fixed-RTO reaction). + * Stable Network: Balances weights for equilibrium throughput (vs. + * static RTO limitations). + */ + if (now < (curBuf->sentTime + conn->rttvar.rto)) + { +#ifdef TIMEOUT_Z + uint32_t diff = (curBuf->sentTime + conn->rttvar.rto) - now; + if(unack_queue_ring.retrans_count == 0 && unack_queue_ring.no_retrans_count == 0) + { + unack_queue_ring.min = diff; + unack_queue_ring.max = diff; + } + + if (diff < unack_queue_ring.min) unack_queue_ring.min = diff; + if (diff > unack_queue_ring.max) unack_queue_ring.max = diff; + + if (unack_queue_ring.retrans_count == 0) + unack_queue_ring.time_difference = unack_queue_ring.max; + else if (unack_queue_ring.no_retrans_count == 0 && ic_statistics.retransmits < (Gp_interconnect_min_retries_before_timeout / 4)) + unack_queue_ring.time_difference = 0; + else + { + uint32_t total_count = unack_queue_ring.retrans_count + unack_queue_ring.no_retrans_count; + double weight_retrans = (double)unack_queue_ring.retrans_count / total_count; + double weight_no_retrans = (double)unack_queue_ring.no_retrans_count / total_count; + unack_queue_ring.time_difference = (uint32_t)(unack_queue_ring.max * weight_no_retrans + unack_queue_ring.min * weight_retrans); + } + + ++unack_queue_ring.no_retrans_count; + } + else + ++unack_queue_ring.retrans_count; +#endif #ifdef TRANSFER_PROTOCOL_STATS - updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt); + updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt); #endif + ChunkTransportStateEntryUDP *pEntryUdp; + pEntryUdp = CONTAINER_OF(pEntry, ChunkTransportStateEntryUDP, entry); + putIntoUnackQueueRing(&unack_queue_ring, + curBuf, + computeExpirationPeriod(curBuf->conn, curBuf->nRetry), getCurrentTime()); + struct icpkthdr *pkt_ = curBuf->pkt; - sendOnce(transportStates, pEntry, curBuf, curBuf->conn); + pkt_->send_time = getCurrentTime(); + pkt_->recv_time = 0; + pkt_->retry_times = curBuf->nRetry; - currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + sendOnce(transportStates, pEntry, curBuf, curBuf->conn); - retransmits++; - ic_statistics.retransmits++; - currBuffConn->stat_count_resent++; - currBuffConn->stat_max_resent = Max(currBuffConn->stat_max_resent, - currBuffConn->stat_count_resent); + /* + * Adaptive Retry Backoff with Polling for Network Asymmetry Mitigation + * + * This logic addresses two critical network pathologies: + * 1. RTO Distortion Amplification: + * - Packet loss in volatile networks causes RTO-based retransmission errors + * - Multiple spurious retries increase network load and congestion collapse risk + * 2. Data Skew-Induced Starvation: + * - Under unbalanced workloads, low-traffic nodes experience MON (Message Order Number) delays + * - Delayed ACKs trigger false retransmissions even when packets arrive eventually + * - Unacked queue inflation worsens congestion in high-traffic nodes + */ + int32_t loop_ack = curBuf->nRetry; + uint32_t rto_min = UDP_RTO_MIN / 10; + uint32_t rtoMs = conn->rttvar.rto / 1000; + int32_t wait_time = rto_min > rtoMs ? rto_min : rtoMs; + int32_t loop = 0; - checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged); + /* + * To optimize performance, we need to process all the time-out file descriptors (fds) + * in each batch together. + */ + if (loop_ack > 0) + { + while (loop++ < loop_ack) + { + if (pollAcks(transportStates, pEntryUdp->txfd, wait_time)) + { + handleAcks(transportStates, pEntry, false); + break; + } + + struct icpkthdr *pkt_ = curBuf->pkt; + pkt_->send_time = getCurrentTime(); + pkt_->recv_time = 0; + pkt_->retry_times = curBuf->nRetry; + + sendOnce(transportStates, pEntry, curBuf, curBuf->conn); + + if (loop_ack < (Gp_interconnect_min_retries_before_timeout / 10)) + wait_time += wait_time / 10; + else if (loop_ack > (Gp_interconnect_min_retries_before_timeout / 10) && loop_ack < (Gp_interconnect_min_retries_before_timeout / 5)) + wait_time += RTO_MAX / 10; + else if (loop_ack > (Gp_interconnect_min_retries_before_timeout / 5) && loop_ack < (Gp_interconnect_min_retries_before_timeout / 2)) + wait_time += RTO_MAX / 5; + else if (loop_ack < (Gp_interconnect_min_retries_before_timeout)) + wait_time += RTO_MAX; + }; + } + + currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + + retransmits++; + ic_statistics.retransmits++; + currBuffConn->stat_count_resent++; + currBuffConn->stat_max_resent = Max(currBuffConn->stat_max_resent, + currBuffConn->stat_count_resent); + + checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged); #ifdef AMS_VERBOSE_LOGGING - write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", - curBuf->pkt->seq, curBuf->nRetry, currBuffConn->rtt, currBuffConn->route); - logPkt("RESEND PKT in checkExpiration", curBuf->pkt); + write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", + curBuf->pkt->seq, curBuf->nRetry, currBuffConn->rtt, currBuffConn->route); + logPkt("RESEND PKT in checkExpiration", curBuf->pkt); #endif + } + + timer_span_time += TIMER_SPAN_LOSS; + unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM); } + } + else + { + while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < UNACK_QUEUE_RING_SLOTS_NUM) + { + /* expired, need to resend them */ + ICBuffer *curBuf = NULL; - unack_queue_ring.currentTime += TIMER_SPAN; - unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM); + while ((curBuf = icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL) + { + curBuf->nRetry++; + putIntoUnackQueueRing( + &unack_queue_ring, + curBuf, + computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now); + +#ifdef TRANSFER_PROTOCOL_STATS + updateStats(TPE_DATA_PKT_SEND, curBuf->conn, curBuf->pkt); +#endif + + sendOnce(transportStates, pEntry, curBuf, curBuf->conn); + + currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); + + retransmits++; + ic_statistics.retransmits++; + currBuffConn->stat_count_resent++; + currBuffConn->stat_max_resent = Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent); + checkNetworkTimeout(curBuf, now, &transportStates->networkTimeoutIsLogged); + +#ifdef AMS_VERBOSE_LOGGING + write_log("RESEND pkt with seq %d (retry %d, rtt " UINT64_FORMAT ") to route %d", + curBuf->pkt->seq, curBuf->nRetry, curBuf->conn->rtt, curBuf->conn->route); + logPkt("RESEND PKT in checkExpiration", curBuf->pkt); +#endif + } + + unack_queue_ring.currentTime += TIMER_SPAN; + unack_queue_ring.idx = (unack_queue_ring.idx + 1) % (UNACK_QUEUE_RING_SLOTS_NUM); + } + + /* + * deal with case when there is a long time this function is not called. + */ + unack_queue_ring.currentTime = now - (now % (TIMER_SPAN)); } - /* - * deal with case when there is a long time this function is not called. - */ - unack_queue_ring.currentTime = now - (now % TIMER_SPAN); if (retransmits > 0) { snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, snd_control_info.minCwnd); @@ -5690,7 +6377,7 @@ checkExceptions(ChunkTransportState *transportStates, checkExpirationCapacityFC(transportStates, pEntry, conn, timeout); } - if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS) + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS || Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { uint64 now = getCurrentTime(); @@ -5735,14 +6422,24 @@ static inline int computeTimeout(MotionConn *mConn, int retry) { MotionConnUDP *conn = NULL; + uint32_t rtoMs = 0; conn = CONTAINER_OF(mConn, MotionConnUDP, mConn); + rtoMs = conn->rttvar.rto / 1000; if (icBufferListLength(&conn->unackQueue) == 0) return TIMER_CHECKING_PERIOD; ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue); ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink); + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) + { + if (buf->nRetry == 0 && retry == 0 && unack_queue_ring.numSharedOutStanding < (snd_control_info.cwnd - snd_control_info.minCwnd)) + return 0; + + return rtoMs > TIMER_CHECKING_PERIOD ? rtoMs: TIMER_CHECKING_PERIOD; + } + if (buf->nRetry == 0 && retry == 0) return 0; @@ -5830,7 +6527,7 @@ SendChunkUDPIFC(ChunkTransportState *transportStates, if (pollAcks(transportStates, pEntry->txfd, timeout)) { - if (handleAcks(transportStates, &pEntry->entry)) + if (handleAcks(transportStates, &pEntry->entry, true)) { /* * We make sure that we deal with the stop messages only after @@ -5987,12 +6684,15 @@ SendEOSUDPIFC(ChunkTransportState *transportStates, timeout = computeTimeout(&conn->mConn, retry); if (pollAcks(transportStates, pEntry->txfd, timeout)) - handleAcks(transportStates, &pEntry->entry); - + handleAcks(transportStates, &pEntry->entry, true); checkExceptions(transportStates, &pEntry->entry, &conn->mConn, retry++, timeout); if (retry >= MAX_TRY) + { + if (icBufferListLength(&conn->unackQueue) == 0) + sendBuffers(transportStates, &pEntry->entry, &conn->mConn); break; + } } if ((!conn->mConn.cdbProc) || (icBufferListLength(&conn->unackQueue) == 0 && @@ -6217,24 +6917,60 @@ getCurrentTime(void) static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, uint64 now) { + MotionConnUDP *buffConn = NULL; + buffConn = CONTAINER_OF(buf->conn, MotionConnUDP, mConn); uint64 diff = 0; int idx = 0; - - /* The first packet, currentTime is not initialized */ - if (uqr->currentTime == 0) - uqr->currentTime = now - (now % TIMER_SPAN); - - diff = now + expTime - uqr->currentTime; - if (diff >= UNACK_QUEUE_RING_LENGTH) + + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) { + /* The first packet, currentTime is not initialized */ +#ifndef TIMEOUT_Z + if (uqr->currentTime == 0) + uqr->currentTime = now - (now % TIMER_SPAN_LOSS); +#else + if (uqr->currentTime == 0 && buffConn->rttvar.rto == 0) + uqr->currentTime = now - (now % TIMER_SPAN_LOSS); + else + uqr->currentTime = now + buffConn->rttvar.rto; + +#endif + diff = expTime; + if (diff >= UNACK_QUEUE_RING_LENGTH_LOSS) + { #ifdef AMS_VERBOSE_LOGGING - write_log("putIntoUnackQueueRing:" "now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime); + write_log("putIntoUnackQueueRing:" "now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime); #endif - diff = UNACK_QUEUE_RING_LENGTH - 1; + diff = UNACK_QUEUE_RING_LENGTH_LOSS - 1; + } + else if (diff < TIMER_SPAN_LOSS) + { + diff = diff < TIMER_SPAN_LOSS ? TIMER_SPAN_LOSS : diff; + } } - else if (diff < TIMER_SPAN) + else { - diff = TIMER_SPAN; + if (uqr->currentTime == 0) + uqr->currentTime = now - (now % TIMER_SPAN_LOSS); + + diff = now + expTime - uqr->currentTime; + if (diff >= UNACK_QUEUE_RING_LENGTH) + { +#ifdef AMS_VERBOSE_LOGGING + write_log("putIntoUnackQueueRing:" "now " UINT64_FORMAT "expTime " UINT64_FORMAT "diff " UINT64_FORMAT "uqr-currentTime " UINT64_FORMAT, now, expTime, diff, uqr->currentTime); +#endif + diff = UNACK_QUEUE_RING_LENGTH - 1; + } + else if (diff < TIMER_SPAN) + { + diff = TIMER_SPAN; + } + + idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM; + +#ifdef AMS_VERBOSE_LOGGING + write_log("PUTTW: curtime " UINT64_FORMAT " now " UINT64_FORMAT " (diff " UINT64_FORMAT ") expTime " UINT64_FORMAT " previdx %d, nowidx %d, nextidx %d", uqr->currentTime, now, diff, expTime, buf->unackQueueRingSlot, uqr->idx, idx); +#endif } idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM; @@ -6681,9 +7417,25 @@ rxThreadFunc(void *arg) if (conn != NULL) { + uint64 now = getCurrentTime(); + uint64 send_time = pkt->send_time; + uint64 recv_time = now; + uint64 retry_times = pkt->retry_times; + MotionConnUDP *connUdp = NULL; + + connUdp = CONTAINER_OF(conn, MotionConnUDP, mConn); + bool drop_ack = pkt->seq < connUdp->conn_info.seq ? true : false; /* Handling a regular packet */ if (handleDataPacket(conn, pkt, &peer, &peerlen, ¶m, &wakeup_mainthread)) pkt = NULL; + if (!pkt) + { + param.msg.send_time = send_time; + param.msg.recv_time = recv_time; + param.msg.retry_times = retry_times; + } + if (drop_ack) + param.msg.retry_times = Gp_interconnect_min_retries_before_timeout; ic_statistics.recvPktNum++; } else diff --git a/contrib/interconnect/udp/ic_udpifc.h b/contrib/interconnect/udp/ic_udpifc.h index 76403abb3f3..af3ca72ba3b 100644 --- a/contrib/interconnect/udp/ic_udpifc.h +++ b/contrib/interconnect/udp/ic_udpifc.h @@ -90,6 +90,9 @@ typedef struct icpkthdr */ uint32 seq; uint32 extraSeq; + uint64_t send_time; + uint64_t recv_time; + uint8_t retry_times; } icpkthdr; typedef struct ICBuffer ICBuffer; diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c index 9db3389e0bf..1606adc0dfd 100644 --- a/src/backend/cdb/cdbvars.c +++ b/src/backend/cdb/cdbvars.c @@ -198,6 +198,7 @@ int Gp_interconnect_queue_depth = 4; /* max number of messages * waiting in rx-queue before * we drop. */ int Gp_interconnect_snd_queue_depth = 2; +int Gp_interconnect_mem_size = 10; int Gp_interconnect_timer_period = 5; int Gp_interconnect_timer_checking_period = 20; int Gp_interconnect_default_rtt = 20; diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index e342d762705..3c60c588a3a 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -552,6 +552,8 @@ static const struct config_enum_entry gp_autostats_modes[] = { static const struct config_enum_entry gp_interconnect_fc_methods[] = { {"loss", INTERCONNECT_FC_METHOD_LOSS}, {"capacity", INTERCONNECT_FC_METHOD_CAPACITY}, + {"loss_advance", INTERCONNECT_FC_METHOD_LOSS_ADVANCE}, + {"loss_timer", INTERCONNECT_FC_METHOD_LOSS_TIMER}, {NULL, 0} }; @@ -3708,6 +3710,16 @@ struct config_int ConfigureNamesInt_gp[] = NULL, NULL, NULL }, + { + {"gp_interconnect_mem_size", PGC_USERSET, GP_ARRAY_TUNING, + gettext_noop("Sets the maximum size(in MB) of the send/recv queue memory for all connections in the UDP interconnect"), + NULL + }, + &Gp_interconnect_mem_size, + 10, 1, 1024, + NULL, NULL, NULL + }, + { {"gp_interconnect_timer_period", PGC_USERSET, GP_ARRAY_TUNING, gettext_noop("Sets the timer period (in ms) for UDP interconnect"), diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h index 90af5177ce0..745f9cd3013 100644 --- a/src/include/cdb/cdbvars.h +++ b/src/include/cdb/cdbvars.h @@ -334,6 +334,8 @@ typedef enum GpVars_Interconnect_Method { INTERCONNECT_FC_METHOD_CAPACITY = 0, INTERCONNECT_FC_METHOD_LOSS = 2, + INTERCONNECT_FC_METHOD_LOSS_ADVANCE = 3, + INTERCONNECT_FC_METHOD_LOSS_TIMER = 4, } GpVars_Interconnect_Method; extern int Gp_interconnect_fc_method; @@ -367,6 +369,7 @@ extern int Gp_interconnect_min_rto; extern int Gp_interconnect_transmit_timeout; extern int Gp_interconnect_min_retries_before_timeout; extern int Gp_interconnect_debug_retry_interval; +extern int Gp_interconnect_mem_size; /* UDP recv buf size in KB. For testing */ extern int Gp_udp_bufsize_k; diff --git a/src/include/utils/sync_guc_name.h b/src/include/utils/sync_guc_name.h index 286762e6d6f..532bf2b8638 100644 --- a/src/include/utils/sync_guc_name.h +++ b/src/include/utils/sync_guc_name.h @@ -82,6 +82,7 @@ "gp_interconnect_queue_depth", "gp_interconnect_setup_timeout", "gp_interconnect_snd_queue_depth", + "gp_interconnect_mem_size", "gp_interconnect_tcp_listener_backlog", "gp_interconnect_timer_checking_period", "gp_interconnect_timer_period", diff --git a/src/test/regress/expected/icudp/gp_interconnect_fc_method.out b/src/test/regress/expected/icudp/gp_interconnect_fc_method.out index b115c95a393..4dcda3b8124 100644 --- a/src/test/regress/expected/icudp/gp_interconnect_fc_method.out +++ b/src/test/regress/expected/icudp/gp_interconnect_fc_method.out @@ -96,3 +96,49 @@ SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(len 29 | 100 | 2600 (30 rows) +SET gp_interconnect_fc_method = "loss_advance"; +SHOW gp_interconnect_fc_method; + gp_interconnect_fc_method +--------------------------- + loss_advance +(1 row) + +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + diff --git a/src/test/regress/expected/icudp/queue_depth_combination_capacity.out b/src/test/regress/expected/icudp/queue_depth_combination_capacity.out index ec8ea9594d6..64c177e36d5 100644 --- a/src/test/regress/expected/icudp/queue_depth_combination_capacity.out +++ b/src/test/regress/expected/icudp/queue_depth_combination_capacity.out @@ -266,3 +266,260 @@ SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(len 29 | 100 | 2600 (30 rows) +-- Skew with gather+redistribute +SET gp_interconnect_fc_method = "loss_advance"; +SHOW gp_interconnect_fc_method; + gp_interconnect_fc_method +--------------------------- + loss_advance +(1 row) + +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1024; +SET gp_interconnect_queue_depth = 1024; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + diff --git a/src/test/regress/expected/icudp/queue_depth_combination_loss_advance.out b/src/test/regress/expected/icudp/queue_depth_combination_loss_advance.out new file mode 100644 index 00000000000..c509dd84ef7 --- /dev/null +++ b/src/test/regress/expected/icudp/queue_depth_combination_loss_advance.out @@ -0,0 +1,260 @@ +-- +-- @description Interconncet flow control test case: combination guc value +-- @created 2025-09-12 +-- Set mode +SET gp_interconnect_fc_method = "loss_advance"; +-- Create a table +CREATE TEMP TABLE small_table(dkey INT, jkey INT, rval REAL, tval TEXT default 'abcdefghijklmnopqrstuvwxyz') DISTRIBUTED BY (dkey); +-- Generate some data +INSERT INTO small_table VALUES(generate_series(1, 5000), generate_series(5001, 10000), sqrt(generate_series(5001, 10000))); +-- Functional tests +-- Skew with gather+redistribute +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1024; +SET gp_interconnect_queue_depth = 1024; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + rval2 | count | sum_len_tval +-------+-------+-------------- + 0 | 100 | 2600 + 1 | 100 | 2600 + 2 | 100 | 2600 + 3 | 100 | 2600 + 4 | 100 | 2600 + 5 | 100 | 2600 + 6 | 100 | 2600 + 7 | 100 | 2600 + 8 | 100 | 2600 + 9 | 100 | 2600 + 10 | 100 | 2600 + 11 | 100 | 2600 + 12 | 100 | 2600 + 13 | 100 | 2600 + 14 | 100 | 2600 + 15 | 100 | 2600 + 16 | 100 | 2600 + 17 | 100 | 2600 + 18 | 100 | 2600 + 19 | 100 | 2600 + 20 | 100 | 2600 + 21 | 100 | 2600 + 22 | 100 | 2600 + 23 | 100 | 2600 + 24 | 100 | 2600 + 25 | 100 | 2600 + 26 | 100 | 2600 + 27 | 100 | 2600 + 28 | 100 | 2600 + 29 | 100 | 2600 +(30 rows) + diff --git a/src/test/regress/greenplum_schedule b/src/test/regress/greenplum_schedule index 09331d8d4a8..83be12085b5 100755 --- a/src/test/regress/greenplum_schedule +++ b/src/test/regress/greenplum_schedule @@ -73,7 +73,7 @@ test: gp_dump_query_oids analyze gp_owner_permission incremental_analyze truncat test: indexjoin as_alias regex_gp gpparams with_clause transient_types gp_rules dispatch_encoding motion_gp gp_pullup_expr # interconnect tests -test: icudp/gp_interconnect_queue_depth icudp/gp_interconnect_queue_depth_longtime icudp/gp_interconnect_snd_queue_depth icudp/gp_interconnect_snd_queue_depth_longtime icudp/gp_interconnect_min_retries_before_timeout icudp/gp_interconnect_transmit_timeout icudp/gp_interconnect_cache_future_packets icudp/gp_interconnect_default_rtt icudp/gp_interconnect_fc_method icudp/gp_interconnect_min_rto icudp/gp_interconnect_timer_checking_period icudp/gp_interconnect_timer_period icudp/queue_depth_combination_loss icudp/queue_depth_combination_capacity +test: icudp/gp_interconnect_queue_depth icudp/gp_interconnect_queue_depth_longtime icudp/gp_interconnect_snd_queue_depth icudp/gp_interconnect_snd_queue_depth_longtime icudp/gp_interconnect_min_retries_before_timeout icudp/gp_interconnect_transmit_timeout icudp/gp_interconnect_cache_future_packets icudp/gp_interconnect_default_rtt icudp/gp_interconnect_fc_method icudp/gp_interconnect_min_rto icudp/gp_interconnect_timer_checking_period icudp/gp_interconnect_timer_period icudp/queue_depth_combination_loss icudp/queue_depth_combination_capacity icudp/queue_depth_combination_loss_advance # event triggers cannot run concurrently with any test that runs DDL test: event_trigger_gp diff --git a/src/test/regress/icudp_schedule b/src/test/regress/icudp_schedule index 6a2e3d44a3f..f0332edb7d3 100644 --- a/src/test/regress/icudp_schedule +++ b/src/test/regress/icudp_schedule @@ -4,7 +4,7 @@ # Below cases are also in greenplum_schedule, but as they are fast enough # we duplicate them here to make this pipeline cover more on icudp. -test: icudp/gp_interconnect_queue_depth icudp/gp_interconnect_queue_depth_longtime icudp/gp_interconnect_snd_queue_depth icudp/gp_interconnect_snd_queue_depth_longtime icudp/gp_interconnect_min_retries_before_timeout icudp/gp_interconnect_transmit_timeout icudp/gp_interconnect_cache_future_packets icudp/gp_interconnect_default_rtt icudp/gp_interconnect_fc_method icudp/gp_interconnect_min_rto icudp/gp_interconnect_timer_checking_period icudp/gp_interconnect_timer_period icudp/queue_depth_combination_loss icudp/queue_depth_combination_capacity icudp/icudp_regression +test: icudp/gp_interconnect_queue_depth icudp/gp_interconnect_queue_depth_longtime icudp/gp_interconnect_snd_queue_depth icudp/gp_interconnect_snd_queue_depth_longtime icudp/gp_interconnect_min_retries_before_timeout icudp/gp_interconnect_transmit_timeout icudp/gp_interconnect_cache_future_packets icudp/gp_interconnect_default_rtt icudp/gp_interconnect_fc_method icudp/gp_interconnect_min_rto icudp/gp_interconnect_timer_checking_period icudp/gp_interconnect_timer_period icudp/queue_depth_combination_loss icudp/queue_depth_combination_capacity icudp/icudp_regression icudp/queue_depth_combination_loss_advance # Below case is very slow, do not add it in greenplum_schedule. test: icudp/icudp_full diff --git a/src/test/regress/sql/icudp/gp_interconnect_fc_method.sql b/src/test/regress/sql/icudp/gp_interconnect_fc_method.sql index 52af4a220a1..929cd9c5d42 100644 --- a/src/test/regress/sql/icudp/gp_interconnect_fc_method.sql +++ b/src/test/regress/sql/icudp/gp_interconnect_fc_method.sql @@ -27,3 +27,11 @@ SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(len JOIN small_table USING(jkey) GROUP BY rval2 ORDER BY rval2; + +SET gp_interconnect_fc_method = "loss_advance"; +SHOW gp_interconnect_fc_method; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; diff --git a/src/test/regress/sql/icudp/queue_depth_combination_capacity.sql b/src/test/regress/sql/icudp/queue_depth_combination_capacity.sql index 7e7348d09ca..43aed3fd59e 100644 --- a/src/test/regress/sql/icudp/queue_depth_combination_capacity.sql +++ b/src/test/regress/sql/icudp/queue_depth_combination_capacity.sql @@ -66,3 +66,58 @@ SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(len JOIN small_table USING(jkey) GROUP BY rval2 ORDER BY rval2; + +-- Skew with gather+redistribute +SET gp_interconnect_fc_method = "loss_advance"; +SHOW gp_interconnect_fc_method; + +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1024; +SET gp_interconnect_queue_depth = 1024; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; diff --git a/src/test/regress/sql/icudp/queue_depth_combination_loss_advance.sql b/src/test/regress/sql/icudp/queue_depth_combination_loss_advance.sql new file mode 100644 index 00000000000..a214e2a48ba --- /dev/null +++ b/src/test/regress/sql/icudp/queue_depth_combination_loss_advance.sql @@ -0,0 +1,65 @@ +-- +-- @description Interconncet flow control test case: combination guc value +-- @created 2025-09-12 + +-- Set mode +SET gp_interconnect_fc_method = "loss_advance"; + +-- Create a table +CREATE TEMP TABLE small_table(dkey INT, jkey INT, rval REAL, tval TEXT default 'abcdefghijklmnopqrstuvwxyz') DISTRIBUTED BY (dkey); + +-- Generate some data +INSERT INTO small_table VALUES(generate_series(1, 5000), generate_series(5001, 10000), sqrt(generate_series(5001, 10000))); + +-- Functional tests +-- Skew with gather+redistribute +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1; +SET gp_interconnect_queue_depth = 4096; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 4096; +SET gp_interconnect_queue_depth = 1; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; + +-- Set GUC values +SET gp_interconnect_snd_queue_depth = 1024; +SET gp_interconnect_queue_depth = 1024; +SELECT ROUND(foo.rval * foo.rval)::INT % 30 AS rval2, COUNT(*) AS count, SUM(length(foo.tval)) AS sum_len_tval + FROM (SELECT 5001 AS jkey, rval, tval FROM small_table ORDER BY dkey LIMIT 3000) foo + JOIN small_table USING(jkey) + GROUP BY rval2 + ORDER BY rval2; From a980c43d8ed1ceafe11ddaabe30c5b13c5a1b9f8 Mon Sep 17 00:00:00 2001 From: Zhao Xi Date: Thu, 13 Nov 2025 10:54:40 +0800 Subject: [PATCH 2/3] In resource-constrained environments, 10TB-scale 8-parallel processing in cloudberry may encounter specific anomalies related to Motion layer UDP communication. Below are four key scenarios and how the code modifications address them. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four Anomaly Scenarios 1. Capacity Mismatch: The receiving end’s buffer becomes full, but the sender is unaware. As a result, the sender’s unacknowledged packet queue continues transmitting, leading to unnecessary retransmissions and packet drops. 2. False Deadlock Detection: The peer node processes heartbeat packets but fails to free up buffer capacity. This triggers a false deadlock judgment, incorrectly flagging network anomalies. 3. Unprocessed Packets Require Main Thread Wakeup: When the receive queue is full, incoming data packets are discarded. However, the main thread still needs to be awakened to process backlogged packets in the queue, preventing permanent stalling. 4. Execution Time Mismatch Across Nodes: Issues like data skew, computational performance gaps, or I/O bottlenecks cause significant differences in execution time between nodes. For example, in a hash join, if the inner table’s is not ready, the node cannot process data from other nodes, leading to packet timeouts. *Example Plan*: Packets from to (via ) timeout because the in remains unready, blocking packet processing. Code Modifications and Their Impact The code changes target the above scenarios by enhancing UDP communication feedback, adjusting deadlock checks, and ensuring proper thread wakeup. Key modifications: 1. Addressing Capacity Mismatch: - Added (256) to flag when the receive buffer is full. - When the receive queue is full (), a response with is sent to the sender (). This notifies the sender to pause or adjust transmission, preventing blind retransmissions. 2. Fixing False Deadlock Detection: - Modified to accept as a parameter, enabling ACK polling during deadlock checks. - Extended the initial timeout for deadlock suspicion from to 600 seconds, reducing premature network error reports. - If no response is received after 600 seconds, the buffer capacity is incrementally increased () to alleviate false bottlenecks, with detailed logging before triggering an error. 3. Ensuring Main Thread Wakeup on Full Queue: - In , even when packets are dropped due to a full queue, the main thread is awakened () if the packet matches the waiting query/node/route. This ensures backlogged packets in the queue are processed. 4. Mitigating Node Execution Mismatches: - Added logging for retransmissions after attempts, providing visibility into prolonged packet delays (e.g., due to unready ). - Reset after successful ACK polling, preventing excessive retry counts from triggering false timeouts. --- contrib/interconnect/udp/ic_udpifc.c | 79 +++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 12 deletions(-) diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index eeda2c678bc..0a5d9bdfd08 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -209,6 +209,7 @@ int #define UDPIC_FLAGS_DISORDER (32) #define UDPIC_FLAGS_DUPLICATE (64) #define UDPIC_FLAGS_CAPACITY (128) +#define UDPIC_FLAGS_FULL (256) /* * ConnHtabBin @@ -835,7 +836,7 @@ static void initUdpManager(mudp_manager_t mptr); static inline void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged); static void checkExpiration(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now); -static void checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *conn); +static void checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, MotionConn *conn); static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len); static void cleanupStartupCache(void); @@ -5217,6 +5218,12 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChun shouldSendBuffers |= (handleAckForDisorderPkt(transportStates, &pEntry->entry, &ackConn->mConn, pkt)); break; } + else if (pkt->flags & UDPIC_FLAGS_FULL) + { + if (DEBUG1 >= log_min_messages) + write_log("Recv buff is full [seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, ackConn->conn_info.seq); + break; + } /* * don't get out of the loop if pkt->seq equals to @@ -6096,6 +6103,7 @@ checkExpiration(ChunkTransportState *transportStates, if (pollAcks(transportStates, pEntryUdp->txfd, wait_time)) { handleAcks(transportStates, pEntry, false); + curBuf->nRetry = 0; break; } @@ -6117,6 +6125,12 @@ checkExpiration(ChunkTransportState *transportStates, }; } + if (loop_ack > Gp_interconnect_min_retries_before_timeout / 5) + write_log("Resending packet (seq %d) to %s (pid %d cid %d) with %d retries in %lu seconds", + curBuf->pkt->seq, curBuf->conn->remoteHostAndPort, + curBuf->pkt->dstPid, curBuf->pkt->dstContentId, curBuf->nRetry, + (now - curBuf->sentTime) / 1000 / 1000); + currBuffConn = CONTAINER_OF(curBuf->conn, MotionConnUDP, mConn); retransmits++; @@ -6211,7 +6225,7 @@ checkExpiration(ChunkTransportState *transportStates, * */ static void -checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn) +checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn) { uint64 deadlockCheckTime; ChunkTransportStateEntryUDP *pEntry = NULL; @@ -6248,17 +6262,31 @@ checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn) ic_control_info.lastDeadlockCheckTime = now; ic_statistics.statusQueryMsgNum++; + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE && pollAcks(transportStates, pEntry->txfd, 50)) + { + handleAcks(transportStates, pChunkEntry, false); + conn->deadlockCheckBeginTime = now; + } + /* check network error. */ - if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000)) + if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 100 * 1000)) { - ereport(ERROR, - (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), - errmsg("interconnect encountered a network error, please check your network"), - errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.", - conn->mConn.remoteHostAndPort, - conn->conn_info.dstPid, - conn->conn_info.dstContentId, - Gp_interconnect_transmit_timeout))); + write_log("Did not get any response from %s (pid %d cid %d) in 600 seconds.",conn->mConn.remoteHostAndPort, + conn->conn_info.dstPid, + conn->conn_info.dstContentId); + + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER) + conn->capacity += 1; + + if ((now - conn->deadlockCheckBeginTime) > ((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000)) + ereport(ERROR, + (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), + errmsg("interconnect encountered a network error, please check your network"), + errdetail("Did not get any response from %s (pid %d cid %d) in %d seconds.", + conn->mConn.remoteHostAndPort, + conn->conn_info.dstPid, + conn->conn_info.dstContentId, + Gp_interconnect_transmit_timeout))); } } } @@ -6390,7 +6418,7 @@ checkExceptions(ChunkTransportState *transportStates, if ((retry & 0x3) == 2) { - checkDeadlock(pEntry, conn); + checkDeadlock(transportStates, pEntry, conn); checkRxThreadError(); ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive); } @@ -6540,6 +6568,9 @@ SendChunkUDPIFC(ChunkTransportState *transportStates, } checkExceptions(transportStates, &pEntry->entry, &conn->mConn, retry++, timeout); doCheckExpiration = false; + + if (!doCheckExpiration && icBufferListLength(&conn->unackQueue) == 0 && conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0) + sendBuffers(transportStates, &pEntry->entry, &conn->mConn); } conn->mConn.pBuff = (uint8 *) conn->curBuff->pkt; @@ -7133,6 +7164,30 @@ handleDataPacket(MotionConn *mConn, icpkthdr *pkt, struct sockaddr_storage *peer logPkt("Interconnect error: received a packet when the queue is full ", pkt); ic_statistics.disorderedPktNum++; conn->stat_count_dropped++; + + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_TIMER && rx_control_info.mainWaitingState.waiting && + rx_control_info.mainWaitingState.waitingNode == pkt->motNodeId && + rx_control_info.mainWaitingState.waitingQuery == pkt->icId) + { + if (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE) + { + if (rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE) + rx_control_info.mainWaitingState.reachRoute = conn->route; + } + else if (rx_control_info.mainWaitingState.waitingRoute == conn->route) + { + if (DEBUG2 >= log_min_messages) + write_log("rx thread: main_waiting waking it route %d", rx_control_info.mainWaitingState.waitingRoute); + rx_control_info.mainWaitingState.reachRoute = conn->route; + } + /* WAKE MAIN THREAD HERE */ + *wakeup_mainthread = true; + } + + if (Gp_interconnect_fc_method == INTERCONNECT_FC_METHOD_LOSS_ADVANCE) + { + setAckSendParam(param, &conn->mConn, UDPIC_FLAGS_FULL, conn->conn_info.seq - 1, conn->conn_info.extraSeq); + } return false; } From db84720cd8ede496cb6e972460dc5e9a9fd6c776 Mon Sep 17 00:00:00 2001 From: zhoujiaqi Date: Wed, 9 Jul 2025 16:35:17 +0800 Subject: [PATCH 3/3] Fix: invalid write in MotionConn when motion type is explicit redistribute motion In the `GetMotionSentRecordTypmod` method, `MotionConn` is not call the CAST function, but directly accessed the object by index, which causes an invalid read inside array. --- contrib/interconnect/ic_common.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/contrib/interconnect/ic_common.c b/contrib/interconnect/ic_common.c index d629eb09698..7e266b69efb 100644 --- a/contrib/interconnect/ic_common.c +++ b/contrib/interconnect/ic_common.c @@ -541,14 +541,15 @@ GetMotionSentRecordTypmod(ChunkTransportState * transportStates, int16 motNodeID, int16 targetRoute) { - MotionConn *conn; + MotionConn *conn = NULL; ChunkTransportStateEntry *pEntry = NULL; getChunkTransportState(transportStates, motNodeID, &pEntry); - if (targetRoute == BROADCAST_SEGIDX) - conn = &pEntry->conns[0]; - else - conn = &pEntry->conns[targetRoute]; + if (targetRoute == BROADCAST_SEGIDX) { + targetRoute = 0; + } + + getMotionConn(pEntry, targetRoute, &conn); return &conn->sent_record_typmod; }