From bb7389e0231d596b07b2f74ddc92464fee9a4466 Mon Sep 17 00:00:00 2001 From: "red-hat-konflux[bot]" <126015336+red-hat-konflux[bot]@users.noreply.github.com> Date: Thu, 25 Jun 2026 23:29:19 +0000 Subject: [PATCH] chore(deps): update module github.com/redis/go-redis/v9 to v9.21.0 Signed-off-by: red-hat-konflux <126015336+red-hat-konflux[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- vendor/github.com/redis/go-redis/v9/Makefile | 2 +- .../redis/go-redis/v9/RELEASE-NOTES.md | 46 ++++++++ .../github.com/redis/go-redis/v9/command.go | 109 ++++++++++++++++++ .../redis/go-redis/v9/internal/pool/conn.go | 16 +++ .../redis/go-redis/v9/internal/pool/pool.go | 55 +++++++++ .../go-redis/v9/internal/proto/reader.go | 78 +++++++++++++ .../go-redis/v9/maintnotifications/manager.go | 51 ++++++++ .../v9/maintnotifications/pool_hook.go | 10 +- vendor/github.com/redis/go-redis/v9/pubsub.go | 51 ++++++-- vendor/github.com/redis/go-redis/v9/redis.go | 24 ++-- .../redis/go-redis/v9/stream_commands.go | 66 +++++++++-- .../redis/go-redis/v9/string_commands.go | 41 +++++++ vendor/github.com/redis/go-redis/v9/tx.go | 51 +++++++- .../github.com/redis/go-redis/v9/version.go | 2 +- vendor/modules.txt | 2 +- 17 files changed, 571 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 79b3de73e..a3e8b4279 100644 --- a/go.mod +++ b/go.mod @@ -150,7 +150,7 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.68.1 // indirect github.com/prometheus/procfs v0.20.1 // indirect - github.com/redis/go-redis/v9 v9.20.1 // indirect + github.com/redis/go-redis/v9 v9.21.0 // indirect github.com/robfig/cron/v3 v3.0.2-0.20210106135023-bc59245fe10e // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 // indirect diff --git a/go.sum b/go.sum index 22bb33413..19559f8a5 100644 --- a/go.sum +++ b/go.sum @@ -409,8 +409,8 @@ github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4Ul github.com/r3labs/diff/v3 v3.0.2 h1:yVuxAY1V6MeM4+HNur92xkS39kB/N+cFi2hMkY06BbA= github.com/r3labs/diff/v3 v3.0.2/go.mod h1:Cy542hv0BAEmhDYWtGxXRQ4kqRsVIcEjG9gChUlTmkw= github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA= -github.com/redis/go-redis/v9 v9.20.1 h1:sfCU6A8P3dXbKyWes02uxA2baehGux9dZHfEKtsTB1w= -github.com/redis/go-redis/v9 v9.20.1/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= +github.com/redis/go-redis/v9 v9.21.0 h1:FPBE4hhbAke+TLmcY3WkpbDffJEomdqPn3HYiqAtL9E= +github.com/redis/go-redis/v9 v9.21.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= github.com/robfig/cron/v3 v3.0.2-0.20210106135023-bc59245fe10e h1:0xChnl3lhHiXbgSJKgChye0D+DvoItkOdkGcwelDXH0= github.com/robfig/cron/v3 v3.0.2-0.20210106135023-bc59245fe10e/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= diff --git a/vendor/github.com/redis/go-redis/v9/Makefile b/vendor/github.com/redis/go-redis/v9/Makefile index 90f03b57e..1322bd4f3 100644 --- a/vendor/github.com/redis/go-redis/v9/Makefile +++ b/vendor/github.com/redis/go-redis/v9/Makefile @@ -75,7 +75,7 @@ bench: export RE_CLUSTER=$(RE_CLUSTER) && \ export RCE_DOCKER=$(RCE_DOCKER) && \ export REDIS_VERSION=$(REDIS_VERSION) && \ - go test ./... -test.run=NONE -test.bench=. -test.benchmem -skip Example + go test ./... -test.run=NONE -test.bench=. -test.benchmem -skip Example -timeout 11m test.e2e: @echo "Running E2E tests with auto-start proxy..." diff --git a/vendor/github.com/redis/go-redis/v9/RELEASE-NOTES.md b/vendor/github.com/redis/go-redis/v9/RELEASE-NOTES.md index de24456c3..a125344da 100644 --- a/vendor/github.com/redis/go-redis/v9/RELEASE-NOTES.md +++ b/vendor/github.com/redis/go-redis/v9/RELEASE-NOTES.md @@ -1,5 +1,51 @@ # Release Notes +# 9.21.0 (2026-06-18) + +This is a minor release adding new features and bug fixes. There are no breaking changes; upgrading from 9.20.x is a drop-in replacement. + +## 🚀 Highlights + +### Zero-copy `GetToBuffer` / `SetFromBuffer` + +Two new `StringCmdable` methods let callers read and write Redis string values directly into and from pre-allocated byte buffers, eliminating the per-call payload allocation that `Get`/`Set` incur: + +```go +GetToBuffer(ctx, key, buf) *ZeroCopyStringCmd // reads into buf; ZeroCopyStringCmd { Val() int; Bytes() []byte; Result() (int, error) } +SetFromBuffer(ctx, key, buf) *StatusCmd +``` + +`GetToBuffer` decodes the bulk reply straight into the caller-owned `buf` (no intermediate allocation); a buffer that is too small returns an error after draining the payload, so the connection stays aligned for the next reply. `SetFromBuffer` is provided for API symmetry — it dispatches to the same `[]byte` writer path as `Set(ctx, key, buf, 0)` and produces byte-identical output on the wire. Available on `*Client`, `*ClusterClient`, `*Ring`, `*Conn` and `Pipeliner`. + +([#3834](https://github.com/redis/go-redis/pull/3834)) by [@ndyakov](https://github.com/ndyakov) + +### Explicit `LIMIT 0` for stream trimming + +Redis treats `XTRIM`/`XADD` approximate-trim (`~`) `LIMIT 0` as "disable the trimming effort cap entirely", which differs from omitting `LIMIT` (the implicit `100 * stream-node-max-entries` default). The command builders previously only emitted `LIMIT` when `limit > 0`, so callers could never send an explicit `LIMIT 0`. Following the `KeepTTL = -1` precedent, the new `XTrimLimitDisabled = -1` sentinel now emits an explicit `LIMIT 0`; `limit == 0` keeps the historical no-`LIMIT` behavior, so existing callers produce byte-identical commands. + +([#3848](https://github.com/redis/go-redis/pull/3848)) by [@TheRealMal](https://github.com/TheRealMal) + +## ✨ New Features + +- **Zero-copy buffer string commands**: new `GetToBuffer` / `SetFromBuffer` on `StringCmdable` and the `ZeroCopyStringCmd` result type, reading/writing string values into caller-owned buffers without per-call payload allocation ([#3834](https://github.com/redis/go-redis/pull/3834)) by [@ndyakov](https://github.com/ndyakov) +- **`XTrimLimitDisabled` sentinel**: `XTRIM`/`XADD` approximate trimming can now send an explicit `LIMIT 0` to disable the trim effort cap, via the new `XTrimLimitDisabled = -1` sentinel ([#3848](https://github.com/redis/go-redis/pull/3848)) by [@TheRealMal](https://github.com/TheRealMal) +- **PubSub health-check timeouts**: `channel.initHealthCheck` now bounds the `Ping` it issues with a fresh per-check timeout context (the exported `pingTimeout` / `reconnectTimeout`) instead of `context.TODO()`, so a stuck health-check Ping can no longer block indefinitely ([#3819](https://github.com/redis/go-redis/pull/3819)) by [@abdellani](https://github.com/abdellani) +- **Skip redundant `UNWATCH` in `Tx.Close`**: a transaction now tracks whether a `WATCH` is still active (`watchArmed`) and only issues `UNWATCH` on `Close` when it is, removing an extra round trip on the common `WATCH`/.../`EXEC` and no-key `Watch` paths while never returning a connection to the pool with an active watch ([#3854](https://github.com/redis/go-redis/pull/3854)) by [@fcostaoliveira](https://github.com/fcostaoliveira) + +## 🐛 Bug Fixes + +- **`maintnotifications` `ModeAuto` fail-open**: `ModeAuto` now stays fail-open when the server does not support maintenance notifications — connections are retired and tracking is guarded during downgrade so the client keeps working instead of erroring ([#3853](https://github.com/redis/go-redis/pull/3853)) by [@terrorobe](https://github.com/terrorobe) + +## 👥 Contributors + +We'd like to thank all the contributors who worked on this release! + +[@abdellani](https://github.com/abdellani), [@fcostaoliveira](https://github.com/fcostaoliveira), [@ndyakov](https://github.com/ndyakov), [@terrorobe](https://github.com/terrorobe), [@TheRealMal](https://github.com/TheRealMal) + +--- + +**Full Changelog**: https://github.com/redis/go-redis/compare/v9.20.1...v9.21.0 + # 9.20.1 (2026-06-11) This is a patch release containing bug fixes only. There are no new features or breaking changes; upgrading from 9.20.0 is a drop-in replacement. diff --git a/vendor/github.com/redis/go-redis/v9/command.go b/vendor/github.com/redis/go-redis/v9/command.go index 57a26c8ba..ae0158b3a 100644 --- a/vendor/github.com/redis/go-redis/v9/command.go +++ b/vendor/github.com/redis/go-redis/v9/command.go @@ -880,6 +880,115 @@ func (cmd *RawWriteToCmd) Clone() Cmder { //------------------------------------------------------------------------------ +// ZeroCopyStringCmd reads a bulk string response directly into a user-provided +// buffer, avoiding intermediate allocations. The RESP header is parsed through +// the buffered reader, then bulk data is read straight into the caller's buffer +// via proto.Reader.ReadStringInto — for values larger than the bufio buffer, +// this is effectively zero-copy from the socket to the user buffer. +// +// The buffer must be sized to fit the value; if it is too small an error is +// returned and the payload plus trailing CRLF are drained from the reader so +// the connection stays aligned for subsequent commands. +type ZeroCopyStringCmd struct { + baseCmd + buf []byte // user-provided buffer to read into + n int // number of bytes read into buf + cloned bool // set by Clone(); causes readReply to drain + error +} + +var _ Cmder = (*ZeroCopyStringCmd)(nil) + +func NewZeroCopyStringCmd(ctx context.Context, buf []byte, args ...interface{}) *ZeroCopyStringCmd { + return &ZeroCopyStringCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + cmdType: CmdTypeString, + }, + buf: buf, + } +} + +func (cmd *ZeroCopyStringCmd) SetVal(n int) { + cmd.n = n +} + +func (cmd *ZeroCopyStringCmd) Val() int { + return cmd.n +} + +// Result returns the number of bytes read and any error. +func (cmd *ZeroCopyStringCmd) Result() (int, error) { + return cmd.n, cmd.err +} + +// Bytes returns the slice of the user-provided buffer containing the read data. +func (cmd *ZeroCopyStringCmd) Bytes() []byte { + return cmd.buf[:cmd.n] +} + +func (cmd *ZeroCopyStringCmd) String() string { + return cmdString(cmd, cmd.n) +} + +func (cmd *ZeroCopyStringCmd) readReply(rd *proto.Reader) error { + // Reset the byte count before reading so a previous successful run + // can't leak its data through Bytes() if this call errors out before + // updating cmd.n. + cmd.n = 0 + if cmd.cloned { + // A cloned ZeroCopyStringCmd has no usable destination buffer + // (see Clone for the rationale). Drain the network reply so the + // connection stays aligned for the next command, then surface a + // clear error rather than silently producing a wrong result. + if err := rd.DiscardNext(); err != nil { + return err + } + return fmt.Errorf("redis: ZeroCopyStringCmd cannot be cloned (cmd writes into caller-owned memory)") + } + n, err := rd.ReadStringInto(cmd.buf) + if err != nil { + return err + } + cmd.n = n + return nil +} + +// NoRetry returns true because the response is written directly into the +// caller's buffer. A retry could leave partial data from a failed attempt in +// the buffer, so the caller must handle retries explicitly if needed. +func (cmd *ZeroCopyStringCmd) NoRetry() bool { + return true +} + +// Clone returns a clone that is intentionally non-functional. Cloning a +// ZeroCopyStringCmd has no well-defined semantics: the cmd writes into +// caller-owned memory (the buf passed to GetToBuffer), and a clone can +// neither safely share that buf (concurrent writes from sibling clones +// would race, last-writer wins) nor allocate its own buf (the result +// would be invisible to whoever asked for the original cmd's reply). +// +// The Cmder interface requires Clone, so we return a clone marked so +// that its readReply drains the network reply (keeping the connection +// aligned) and fails the cmd with a clear error. Whoever processes the +// clone gets an explicit error instead of silently-wrong bytes. +// +// In practice this path is unreachable through normal client flows: +// Clone is only called from cluster fan-out routing +// (osscluster_router.go) for multi-shard commands like DBSIZE / KEYS / +// FLUSHDB, and ZeroCopyStringCmd is only produced by GetToBuffer which +// issues GET — a single-key command routed to one shard, never fanned +// out. Combined with NoRetry() returning true, the retry path also will +// not clone this cmd. +func (cmd *ZeroCopyStringCmd) Clone() Cmder { + return &ZeroCopyStringCmd{ + baseCmd: cmd.cloneBaseCmd(), + cloned: true, + } +} + +//------------------------------------------------------------------------------ + type SliceCmd struct { baseCmd diff --git a/vendor/github.com/redis/go-redis/v9/internal/pool/conn.go b/vendor/github.com/redis/go-redis/v9/internal/pool/conn.go index fab54654a..1e836ad44 100644 --- a/vendor/github.com/redis/go-redis/v9/internal/pool/conn.go +++ b/vendor/github.com/redis/go-redis/v9/internal/pool/conn.go @@ -114,6 +114,11 @@ type Conn struct { // to inform the goroutine using the connection why the connection was closed. closeReason uberatomic.String + // closeOnPutReason marks an in-use connection for removal when it is returned + // to the pool. The socket is left open for the in-flight command and closed + // by ConnPool.Put. + closeOnPutReason uberatomic.String + // maintenanceNotifications upgrade support: relaxed timeouts during migrations/failovers // Using atomic operations for lock-free access to avoid mutex contention @@ -437,6 +442,17 @@ func (cn *Conn) IsPooled() bool { return cn.pooled } +// MarkCloseOnPut marks the connection for removal when it is returned to the pool. +func (cn *Conn) MarkCloseOnPut(reason string) { + cn.closeOnPutReason.Store(reason) +} + +// CloseOnPutReason returns a non-empty reason when the connection should be +// removed instead of pooled on Put. +func (cn *Conn) CloseOnPutReason() string { + return cn.closeOnPutReason.Load() +} + // IsPubSub returns true if the connection is used for PubSub. func (cn *Conn) IsPubSub() bool { return cn.pubsub diff --git a/vendor/github.com/redis/go-redis/v9/internal/pool/pool.go b/vendor/github.com/redis/go-redis/v9/internal/pool/pool.go index d551fbb17..8f648ffbf 100644 --- a/vendor/github.com/redis/go-redis/v9/internal/pool/pool.go +++ b/vendor/github.com/redis/go-redis/v9/internal/pool/pool.go @@ -33,6 +33,11 @@ const ( // CloseReasonFailover indicates the connection was closed due to a failover event. CloseReasonFailover = "failover" + + // CloseReasonMaintNotificationsDisabled indicates the connection enabled + // maintenance notifications, but the client later downgraded and disabled + // maintenance notification handling for the pool. + CloseReasonMaintNotificationsDisabled = "maintnotifications_disabled" ) // Metric state constants for connection state tracking. @@ -315,6 +320,10 @@ type Stats struct { PubSubStats PubSubStats } +type ConnRetirer interface { + RetireConns(ctx context.Context, conns []*Conn, reason string) +} + type Pooler interface { NewConn(context.Context) (*Conn, error) CloseConn(ctx context.Context, cn *Conn, reason string, fromState string) error @@ -1217,6 +1226,11 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { shouldRemove := false var err error + if reason := cn.CloseOnPutReason(); reason != "" { + p.removeConnInternal(ctx, cn, errors.New(reason), freeTurn) + return + } + if cn.HasBufferedData() { // Peek at the reply type to check if it's a push notification if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush { @@ -1562,6 +1576,47 @@ func (p *ConnPool) closed() bool { return atomic.LoadUint32(&p._closed) == 1 } +func (p *ConnPool) RetireConns(ctx context.Context, conns []*Conn, reason string) { + if len(conns) == 0 { + return + } + + idleConnSet := make(map[*Conn]struct{}) + toClose := make([]*Conn, 0, len(conns)) + + p.connsMu.Lock() + for _, ic := range p.idleConns { + idleConnSet[ic] = struct{}{} + } + for _, cn := range conns { + if cn == nil { + continue + } + if _, ok := p.conns[cn.GetID()]; !ok { + continue + } + if _, isIdle := idleConnSet[cn]; isIdle { + if p.removeConn(cn) { + toClose = append(toClose, cn) + } + continue + } + cn.MarkCloseOnPut(reason) + } + p.connsMu.Unlock() + + if hookManager := p.hookManager.Load(); hookManager != nil { + for _, cn := range toClose { + hookManager.ProcessOnRemove(ctx, cn, errors.New(reason)) + } + } + for _, cn := range toClose { + p.recordConnectionMetrics(ctx, cn, reason, MetricStateIdle) + _ = p.closeConn(cn) + } + p.checkMinIdleConns() +} + func (p *ConnPool) Filter(fn func(*Conn) bool) error { ctx := context.Background() diff --git a/vendor/github.com/redis/go-redis/v9/internal/proto/reader.go b/vendor/github.com/redis/go-redis/v9/internal/proto/reader.go index 83f28e4da..33b027f79 100644 --- a/vendor/github.com/redis/go-redis/v9/internal/proto/reader.go +++ b/vendor/github.com/redis/go-redis/v9/internal/proto/reader.go @@ -533,6 +533,84 @@ func (r *Reader) ReadFloat() (float64, error) { return 0, fmt.Errorf("redis: can't parse float reply: %.100q", line) } +// ReadStringInto reads a string-typed reply directly into buf, avoiding the +// per-call allocation that ReadString incurs. It returns the number of bytes +// written to buf. +// +// Supported reply types: +// - $\r\n\r\n bulk string (the GET path; payload is read +// straight into buf via bufio.Reader — for payloads larger than the +// bufio buffer this is effectively zero-copy from the socket) +// - +\r\n simple string, copied from the header line +// - :\r\n integer, copied as its ASCII representation +// - ,\r\n float, copied as its ASCII representation +// +// Errors, nil, push notifications, and RESP3 attributes are intercepted +// by ReadLine and surfaced through err. RESP3 verbatim strings +// (=\r\n\r\n) are intentionally not handled — they are +// never returned by GET-family commands, and including them re-introduces +// a hazard class where the response-type byte read from a stale `line[0]` +// after a bufio refill can be misinterpreted as the verbatim format tag. +// +// If the bulk payload does not fit in buf, an error is returned and the +// payload plus the trailing CRLF are drained from the reader so the +// connection stays aligned for the next reply. For simple-string / integer +// / float responses the payload lives in the (already-consumed) header +// line, so no drain is needed. +func (r *Reader) ReadStringInto(buf []byte) (int, error) { + line, err := r.ReadLine() + if err != nil { + return 0, err + } + + switch line[0] { + case RespStatus: + // Simple string — data is in the line itself. + s := line[1:] + if len(s) > len(buf) { + return 0, fmt.Errorf("redis: buffer too small: need %d bytes, have %d", len(s), len(buf)) + } + return copy(buf, s), nil + + case RespString: + n, err := replyLen(line) + if err != nil { + return 0, err + } + if n > len(buf) { + // Drain the payload + trailing \r\n so the next read on this + // connection sees the start of the next reply rather than the + // tail of this one. Otherwise the unread bytes corrupt the + // stream and the bad connection gets handed back to the pool. + if _, derr := r.rd.Discard(n + 2); derr != nil { + return 0, derr + } + return 0, fmt.Errorf("redis: buffer too small: need %d bytes, have %d", n, len(buf)) + } + // Read data directly into the user's buffer through the bufio.Reader. + // bufio.Reader.Read first drains its internal buffer, then for + // remaining data larger than its buffer size reads directly from the + // underlying reader (socket) — effectively zero-copy. + if _, err := io.ReadFull(r.rd, buf[:n]); err != nil { + return 0, err + } + // Discard trailing \r\n. + if _, err := r.rd.Discard(2); err != nil { + return 0, err + } + return n, nil + + case RespInt, RespFloat: + s := line[1:] + if len(s) > len(buf) { + return 0, fmt.Errorf("redis: buffer too small: need %d bytes, have %d", len(s), len(buf)) + } + return copy(buf, s), nil + } + + return 0, fmt.Errorf("redis: can't parse reply=%.100q reading string into buffer", line) +} + func (r *Reader) ReadString() (string, error) { line, err := r.ReadLine() if err != nil { diff --git a/vendor/github.com/redis/go-redis/v9/maintnotifications/manager.go b/vendor/github.com/redis/go-redis/v9/maintnotifications/manager.go index 3f9478e1b..ff54c717d 100644 --- a/vendor/github.com/redis/go-redis/v9/maintnotifications/manager.go +++ b/vendor/github.com/redis/go-redis/v9/maintnotifications/manager.go @@ -82,6 +82,10 @@ type Manager struct { hooksMu sync.RWMutex // Protects hooks slice poolHooksRef *PoolHook + // Connections that successfully enabled maintnotifications. These need to be + // retired before the pool-level listeners are removed. + maintNotificationsConns sync.Map // connID -> *pool.Conn + // Cluster state reload callback for SMIGRATED notifications clusterStateReloadCallback ClusterStateReloadCallback } @@ -251,6 +255,49 @@ func (hm *Manager) MarkSMigratedSeqIDProcessed(seqID int64) bool { return !alreadyProcessed // Return true if NOT already processed } +// TrackMaintNotificationsConn records a connection that successfully enabled +// maintnotifications so it can be retired if the feature is later disabled for +// the pool. +func (hm *Manager) TrackMaintNotificationsConn(cn *pool.Conn) { + if cn == nil { + return + } + hm.maintNotificationsConns.Store(cn.GetID(), cn) +} + +// UntrackMaintNotificationsConn removes a connection from the enabled +// maintnotifications set. +func (hm *Manager) UntrackMaintNotificationsConn(connID uint64) { + hm.maintNotificationsConns.Delete(connID) +} + +func (hm *Manager) maintNotificationsConnSnapshot() []*pool.Conn { + var conns []*pool.Conn + hm.maintNotificationsConns.Range(func(_, value interface{}) bool { + if cn, ok := value.(*pool.Conn); ok { + conns = append(conns, cn) + } + return true + }) + return conns +} + +func (hm *Manager) retireMaintNotificationsConns(ctx context.Context) { + conns := hm.maintNotificationsConnSnapshot() + if len(conns) == 0 || hm.pool == nil { + return + } + + if retirer, ok := hm.pool.(pool.ConnRetirer); ok { + retirer.RetireConns(ctx, conns, pool.CloseReasonMaintNotificationsDisabled) + return + } + + for _, cn := range conns { + _ = hm.pool.CloseConn(ctx, cn, pool.CloseReasonMaintNotificationsDisabled, pool.MetricStateIdle) + } +} + // Close closes the manager. func (hm *Manager) Close() error { // Use atomic operation for thread-safe close check @@ -258,6 +305,10 @@ func (hm *Manager) Close() error { return nil // Already closed } + // Retire connections that enabled maintnotifications before removing the + // pool-level listeners that process those push notifications. + hm.retireMaintNotificationsConns(context.Background()) + // Shutdown the pool hook if it exists if hm.poolHooksRef != nil { // Use a timeout to prevent hanging indefinitely diff --git a/vendor/github.com/redis/go-redis/v9/maintnotifications/pool_hook.go b/vendor/github.com/redis/go-redis/v9/maintnotifications/pool_hook.go index 9ea0558bf..752abc711 100644 --- a/vendor/github.com/redis/go-redis/v9/maintnotifications/pool_hook.go +++ b/vendor/github.com/redis/go-redis/v9/maintnotifications/pool_hook.go @@ -17,6 +17,10 @@ type OperationsManagerInterface interface { UntrackOperationWithConnID(seqID int64, connID uint64) } +type maintNotificationsConnTracker interface { + UntrackMaintNotificationsConn(connID uint64) +} + // HandoffRequest represents a request to handoff a connection to a new endpoint type HandoffRequest struct { Conn *pool.Conn @@ -172,8 +176,10 @@ func (ph *PoolHook) OnPut(ctx context.Context, conn *pool.Conn) (shouldPool bool return true, false, nil } -func (ph *PoolHook) OnRemove(_ context.Context, _ *pool.Conn, _ error) { - // Not used +func (ph *PoolHook) OnRemove(_ context.Context, conn *pool.Conn, _ error) { + if tracker, ok := ph.operationsManager.(maintNotificationsConnTracker); ok && conn != nil { + tracker.UntrackMaintNotificationsConn(conn.GetID()) + } } // Shutdown gracefully shuts down the processor, waiting for workers to complete diff --git a/vendor/github.com/redis/go-redis/v9/pubsub.go b/vendor/github.com/redis/go-redis/v9/pubsub.go index 9d6961059..f9c30a639 100644 --- a/vendor/github.com/redis/go-redis/v9/pubsub.go +++ b/vendor/github.com/redis/go-redis/v9/pubsub.go @@ -659,6 +659,25 @@ func WithChannelSendTimeout(d time.Duration) ChannelOption { } } +// WithChannelPingTimeout specifies the timeout for the health-check ping. +// +// The default is 5 seconds. +func WithChannelPingTimeout(d time.Duration) ChannelOption { + return func(c *channel) { + c.pingTimeout = d + } +} + +// WithChannelReconnectTimeout specifies the timeout for reconnecting after +// a failed health-check ping. +// +// The default is 10 seconds. +func WithChannelReconnectTimeout(d time.Duration) ChannelOption { + return func(c *channel) { + c.reconnectTimeout = d + } +} + type channel struct { pubSub *PubSub @@ -666,18 +685,22 @@ type channel struct { allCh chan interface{} ping chan struct{} - chanSize int - chanSendTimeout time.Duration - checkInterval time.Duration + chanSize int + chanSendTimeout time.Duration + checkInterval time.Duration + pingTimeout time.Duration + reconnectTimeout time.Duration } func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { c := &channel{ pubSub: pubSub, - chanSize: 100, - chanSendTimeout: time.Minute, - checkInterval: 3 * time.Second, + chanSize: 100, + chanSendTimeout: time.Minute, + checkInterval: 3 * time.Second, + pingTimeout: 5 * time.Second, + reconnectTimeout: 10 * time.Second, } for _, opt := range opts { opt(c) @@ -689,7 +712,6 @@ func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { } func (c *channel) initHealthCheck() { - ctx := context.TODO() c.ping = make(chan struct{}, 1) go func() { @@ -700,13 +722,20 @@ func (c *channel) initHealthCheck() { timer.Reset(c.checkInterval) select { case <-c.ping: - if !timer.Stop() { - <-timer.C + select { + case <-timer.C: + default: } case <-timer.C: - if pingErr := c.pubSub.Ping(ctx); pingErr != nil { + ctx, cancel := context.WithTimeout(context.Background(), c.pingTimeout) + pingErr := c.pubSub.Ping(ctx) + cancel() + + if pingErr != nil { c.pubSub.mu.Lock() - c.pubSub.reconnect(ctx, pingErr) + reconnectCtx, reconnectCancel := context.WithTimeout(context.Background(), c.reconnectTimeout) + c.pubSub.reconnect(reconnectCtx, pingErr) + reconnectCancel() c.pubSub.mu.Unlock() } case <-c.pubSub.exit: diff --git a/vendor/github.com/redis/go-redis/v9/redis.go b/vendor/github.com/redis/go-redis/v9/redis.go index dd3451890..3bb759c34 100644 --- a/vendor/github.com/redis/go-redis/v9/redis.go +++ b/vendor/github.com/redis/go-redis/v9/redis.go @@ -701,11 +701,24 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { var maintNotifHandshakeErr error if maintNotifEnabled && protocol == 3 { + // Hold the manager read lock across the handshake and tracking so a + // concurrent downgrade cannot remove pool-level listeners before a + // successfully enabled connection is tracked for retirement. + c.maintNotificationsManagerLock.RLock() + manager := c.maintNotificationsManager maintNotifHandshakeErr = conn.ClientMaintNotifications( ctx, true, endpointType.String(), ).Err() + // A successful handshake enables maintnotifications for this connection, + // but must not promote ModeAuto to ModeEnabled. ModeEnabled is the + // explicit fail-closed policy; ModeAuto must remain able to downgrade if a + // later reconnect/failover reaches an endpoint that rejects the command. + if maintNotifHandshakeErr == nil && manager != nil { + manager.TrackMaintNotificationsConn(cn) + } + c.maintNotificationsManagerLock.RUnlock() if maintNotifHandshakeErr != nil { if !isRedisError(maintNotifHandshakeErr) { // if not redis error, fail the connection @@ -738,13 +751,6 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { internal.Logger.Printf(ctx, "failed to disable maintnotifications in auto mode: %v", initErr) } } - } else { - // handshake was executed successfully - // to make sure that the handshake will be executed on other connections as well if it was successfully - // executed on this connection, we will force the handshake to be executed on all connections - c.optLock.Lock() - c.opt.MaintNotificationsConfig.Mode = maintnotifications.ModeEnabled - c.optLock.Unlock() } } @@ -1069,7 +1075,9 @@ func (c *baseClient) disableMaintNotificationsUpgrades() error { if c.maintNotificationsManager != nil { // Closing the manager will also shutdown the pool hook // and remove it from the pool - c.maintNotificationsManager.Close() + if err := c.maintNotificationsManager.Close(); err != nil { + return err + } c.maintNotificationsManager = nil } return nil diff --git a/vendor/github.com/redis/go-redis/v9/stream_commands.go b/vendor/github.com/redis/go-redis/v9/stream_commands.go index 71191aec4..e2b2a9e2f 100644 --- a/vendor/github.com/redis/go-redis/v9/stream_commands.go +++ b/vendor/github.com/redis/go-redis/v9/stream_commands.go @@ -9,6 +9,33 @@ import ( "github.com/redis/go-redis/v9/internal/otel" ) +// XTrimLimitDisabled is a sentinel value for the LIMIT argument of stream +// trimming (XAddArgs.Limit and the XTrim*Approx* commands). Passing it emits +// an explicit "LIMIT 0", which tells Redis to disable the trimming effort cap +// entirely. This differs from passing 0, which keeps the historical behavior: +// no LIMIT clause is sent and Redis applies its implicit default +// (100 * stream-node-max-entries examined entries). +// +// LIMIT is only valid together with the "~" (approximate) trimming flag; +// Redis rejects LIMIT used with exact ("=") trimming. +const XTrimLimitDisabled = -1 + +// appendXTrimLimit appends the LIMIT clause used by XADD and XTRIM trimming: +// - limit > 0 emits "LIMIT "; +// - limit < 0 (see XTrimLimitDisabled) emits "LIMIT 0", disabling the +// trimming effort cap; +// - limit == 0 omits the clause, so Redis applies its implicit default. +func appendXTrimLimit(args []interface{}, limit int64) []interface{} { + switch { + case limit > 0: + return append(args, "limit", limit) + case limit < 0: + return append(args, "limit", int64(0)) + default: + return args + } +} + type StreamCmdable interface { XAdd(ctx context.Context, a *XAddArgs) *StringCmd XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd @@ -73,7 +100,14 @@ type XAddArgs struct { MaxLen int64 // MAXLEN N MinID string // Approx causes MaxLen and MinID to use "~" matcher (instead of "="). - Approx bool + Approx bool + // Limit caps the trimming effort: + // - 0 omits the LIMIT clause (Redis applies its implicit default); + // - a positive value emits "LIMIT "; + // - a negative value (see XTrimLimitDisabled) emits "LIMIT 0", + // disabling the effort cap entirely. + // LIMIT requires Approx to be true; Redis rejects LIMIT together with + // exact ("=") trimming. Limit int64 Mode string ID string @@ -118,9 +152,7 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd { args = append(args, "minid", "=", a.MinID) } } - if a.Limit > 0 { - args = append(args, "limit", a.Limit) - } + args = appendXTrimLimit(args, a.Limit) if a.ID != "" { args = append(args, a.ID) @@ -542,6 +574,10 @@ func xClaimArgs(a *XClaimArgs) []interface{} { // XTRIM key MAXLEN/MINID ~ threshold LIMIT limit. // // The redis-server version is lower than 6.2, please set limit to 0. +// +// limit == 0 omits the LIMIT clause, a positive limit emits "LIMIT ", +// and a negative limit (see XTrimLimitDisabled) emits "LIMIT 0" to disable +// the trimming effort cap. LIMIT requires approx; Redis rejects it otherwise. func (c cmdable) xTrim( ctx context.Context, key, strategy string, approx bool, threshold interface{}, limit int64, @@ -554,9 +590,7 @@ func (c cmdable) xTrim( args = append(args, "=") } args = append(args, threshold) - if limit > 0 { - args = append(args, "limit", limit) - } + args = appendXTrimLimit(args, limit) cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) return cmd @@ -568,6 +602,12 @@ func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *Int return c.xTrim(ctx, key, "maxlen", false, maxLen, 0) } +// XTrimMaxLenApprox trims the stream using the `~` rule. +// cmd: XTRIM key MAXLEN ~ maxLen [LIMIT limit] +// +// limit == 0 omits the LIMIT clause, limit > 0 emits "LIMIT ", and a +// negative limit (see XTrimLimitDisabled) emits "LIMIT 0" to disable the +// trimming effort cap. func (c cmdable) XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd { return c.xTrim(ctx, key, "maxlen", true, maxLen, limit) } @@ -576,10 +616,18 @@ func (c cmdable) XTrimMinID(ctx context.Context, key string, minID string) *IntC return c.xTrim(ctx, key, "minid", false, minID, 0) } +// XTrimMinIDApprox trims the stream using the `~` rule. +// cmd: XTRIM key MINID ~ minID [LIMIT limit] +// +// limit == 0 omits the LIMIT clause, limit > 0 emits "LIMIT ", and a +// negative limit (see XTrimLimitDisabled) emits "LIMIT 0" to disable the +// trimming effort cap. func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd { return c.xTrim(ctx, key, "minid", true, minID, limit) } +// xTrimMode is xTrim with a trailing trimming mode argument (e.g. KEEPREF). +// The limit semantics are the same as xTrim's. func (c cmdable) xTrimMode( ctx context.Context, key, strategy string, approx bool, threshold interface{}, limit int64, @@ -593,9 +641,7 @@ func (c cmdable) xTrimMode( args = append(args, "=") } args = append(args, threshold) - if limit > 0 { - args = append(args, "limit", limit) - } + args = appendXTrimLimit(args, limit) args = append(args, mode) cmd := NewIntCmd(ctx, args...) _ = c(ctx, cmd) diff --git a/vendor/github.com/redis/go-redis/v9/string_commands.go b/vendor/github.com/redis/go-redis/v9/string_commands.go index 88a80844e..6731c09f5 100644 --- a/vendor/github.com/redis/go-redis/v9/string_commands.go +++ b/vendor/github.com/redis/go-redis/v9/string_commands.go @@ -18,6 +18,7 @@ type StringCmdable interface { GetSet(ctx context.Context, key string, value interface{}) *StringCmd GetEx(ctx context.Context, key string, expiration time.Duration) *StringCmd GetDel(ctx context.Context, key string) *StringCmd + GetToBuffer(ctx context.Context, key string, buf []byte) *ZeroCopyStringCmd Incr(ctx context.Context, key string) *IntCmd IncrBy(ctx context.Context, key string, value int64) *IntCmd IncrByFloat(ctx context.Context, key string, value float64) *FloatCmd @@ -31,6 +32,7 @@ type StringCmdable interface { Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd SetArgs(ctx context.Context, key string, value interface{}, a SetArgs) *StatusCmd SetEx(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd + SetFromBuffer(ctx context.Context, key string, buf []byte) *StatusCmd SetIFEQ(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd SetIFEQGet(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StringCmd SetIFNE(ctx context.Context, key string, value interface{}, matchValue interface{}, expiration time.Duration) *StatusCmd @@ -182,6 +184,24 @@ func (c cmdable) GetDel(ctx context.Context, key string) *StringCmd { return cmd } +// GetToBuffer executes GET and reads the reply directly into buf, avoiding +// the intermediate string allocation that *StringCmd would produce. For values +// larger than the connection's read buffer, the payload is read straight from +// the socket into buf — effectively zero-copy on the receive path. +// +// The returned *ZeroCopyStringCmd reports the number of bytes read via Val(), +// the populated slice via Bytes() (which is buf[:Val()]), and any error +// (including redis.Nil when the key does not exist) via Err(). If buf is too +// small to hold the value, Err() returns a "buffer too small" error. +// +// This command opts out of automatic retries because partial data from a +// failed attempt would already be sitting in the caller's buffer. +func (c cmdable) GetToBuffer(ctx context.Context, key string, buf []byte) *ZeroCopyStringCmd { + cmd := NewZeroCopyStringCmd(ctx, buf, "get", key) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) Incr(ctx context.Context, key string) *IntCmd { cmd := NewIntCmd(ctx, "incr", key) _ = c(ctx, cmd) @@ -555,6 +575,27 @@ func (c cmdable) SetEx(ctx context.Context, key string, value interface{}, expir return cmd } +// SetFromBuffer executes SET writing the value directly from buf. For values +// larger than the connection's write buffer, bufio.Writer.Write flushes its +// internal buffer (containing the RESP header) and then writes buf straight +// to the socket — effectively zero-copy on the send path. +// +// Expiration is not supported; use Expire separately if a TTL is required. +// +// Note: SetFromBuffer is exposed for API symmetry with GetToBuffer and is +// functionally equivalent to Set(ctx, key, buf, 0) — both dispatch to the +// same []byte case in the RESP writer and produce identical bytes on the +// wire. The zero-copy property on the send path comes from +// bufio.Writer.Write bypassing its internal buffer for large payloads, +// which Set([]byte) gets automatically. Prefer SetFromBuffer in code that +// also uses GetToBuffer so the buffer-based pattern reads coherently; +// otherwise Set(ctx, key, buf, 0) is equally efficient. +func (c cmdable) SetFromBuffer(ctx context.Context, key string, buf []byte) *StatusCmd { + cmd := NewStatusCmd(ctx, "set", key, buf) + _ = c(ctx, cmd) + return cmd +} + // SetNX sets the value of a key only if the key does not exist. // // Zero expiration means the key has no expiration time. diff --git a/vendor/github.com/redis/go-redis/v9/tx.go b/vendor/github.com/redis/go-redis/v9/tx.go index b433b4024..179230e30 100644 --- a/vendor/github.com/redis/go-redis/v9/tx.go +++ b/vendor/github.com/redis/go-redis/v9/tx.go @@ -2,6 +2,7 @@ package redis import ( "context" + "errors" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" @@ -19,6 +20,24 @@ type Tx struct { baseClient cmdable statefulCmdable + + // watchArmed reports whether a WATCH issued through Tx.Watch may still be + // active on the connection. It is set on a successful WATCH and cleared once + // the watched keys are discarded server-side: on UNWATCH (Tx.Unwatch) and on + // EXEC, including an aborted EXEC (the TxPipeline closure). Close uses it to + // skip an otherwise redundant UNWATCH round trip. + // + // Only Tx.Watch, Tx.Unwatch and the TxPipeline EXEC closure maintain this + // flag. go-redis never issues a standalone DISCARD (Pipeline.Discard is a + // client-side buffer reset, not a server command). Issuing a WATCH directly + // via Process bypasses tracking, so Close would not release it and the watch + // would leak onto the pooled connection; that is the one case where skipping + // UNWATCH is unsafe, and using raw Process for WATCH/EXEC/UNWATCH is therefore + // unsupported. + // + // Tx is not safe for concurrent use (see above), so watchArmed is accessed + // only from the goroutine that owns the Tx and needs no synchronization. + watchArmed bool } func (c *Client) newTx() *Tx { @@ -70,7 +89,13 @@ func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) // Close closes the transaction, releasing any open resources. func (c *Tx) Close(ctx context.Context) error { - _ = c.Unwatch(ctx).Err() + // UNWATCH is only needed while a WATCH is still active. EXEC discards the + // watched keys server-side on both commit and abort, so the common + // WATCH/.../EXEC paths leave nothing to release and avoid the extra round + // trip. + if c.watchArmed { + _ = c.Unwatch(ctx).Err() + } return c.baseClient.Close() } @@ -84,6 +109,11 @@ func (c *Tx) Watch(ctx context.Context, keys ...string) *StatusCmd { } cmd := NewStatusCmd(ctx, args...) _ = c.Process(ctx, cmd) + // A successful WATCH leaves keys watched on the connection that Close must + // later release with UNWATCH. + if cmd.Err() == nil { + c.watchArmed = true + } return cmd } @@ -96,6 +126,10 @@ func (c *Tx) Unwatch(ctx context.Context, keys ...string) *StatusCmd { } cmd := NewStatusCmd(ctx, args...) _ = c.Process(ctx, cmd) + // The watched keys have been released, so Close need not UNWATCH again. + if cmd.Err() == nil { + c.watchArmed = false + } return cmd } @@ -133,7 +167,20 @@ func (c *Tx) TxPipeline() Pipeliner { pipe := Pipeline{ exec: func(ctx context.Context, cmds []Cmder) error { cmds = wrapMultiExec(ctx, cmds) - return c.processTxPipelineHook(ctx, cmds) + err := c.processTxPipelineHook(ctx, cmds) + // EXEC discards the watched keys server-side, so the watch is + // cleared only when EXEC actually ran: a nil error (committed), + // TxFailedErr (a watched key changed, EXEC returned nil), or an + // EXECABORT error (a queued command was rejected, so EXEC discarded + // the transaction). All three release the watched keys. Any other + // error may be reported before EXEC executes (for example -LOADING on + // the MULTI reply) or on a broken connection, so leave watchArmed set + // and let Close send UNWATCH rather than risk leaving a watch on a + // pooled connection. + if err == nil || errors.Is(err, TxFailedErr) || IsExecAbortError(err) { + c.watchArmed = false + } + return err }, } pipe.init() diff --git a/vendor/github.com/redis/go-redis/v9/version.go b/vendor/github.com/redis/go-redis/v9/version.go index d59381170..c6cabc694 100644 --- a/vendor/github.com/redis/go-redis/v9/version.go +++ b/vendor/github.com/redis/go-redis/v9/version.go @@ -2,5 +2,5 @@ package redis // Version is the current release version. func Version() string { - return "9.20.1" + return "9.21.0" } diff --git a/vendor/modules.txt b/vendor/modules.txt index 79b0d0341..20c5cf8a3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -733,7 +733,7 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/redis/go-redis/v9 v9.20.1 +# github.com/redis/go-redis/v9 v9.21.0 ## explicit; go 1.24 github.com/redis/go-redis/v9 github.com/redis/go-redis/v9/auth