Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion drpcmanager/active_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func testMuxWriter(t *testing.T) *drpcwire.MuxWriter {
}

func testStream(t *testing.T, id uint64) *drpcstream.Stream {
return drpcstream.New(context.Background(), id, testMuxWriter(t))
return drpcstream.New(context.Background(), id, testMuxWriter(t), drpcstream.NewBufferPool())
}

func TestActiveStreams_AddAndGet(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type Manager struct {
wg sync.WaitGroup // tracks active manageStream goroutines

// streams tracks active streams.
streams *activeStreams
streams *activeStreams
recvPool *drpcstream.BufferPool

pdone drpcsignal.Chan // signals when NewServerStream has registered the new stream
invokes chan invokeInfo // completed invoke info from manageReader to NewServerStream
Expand Down Expand Up @@ -130,6 +131,7 @@ func NewWithOptions(tr drpc.Transport, kind ManagerKind, opts Options) *Manager
m.pendingStreams = make(map[uint64]*pendingStream)

m.streams = newActiveStreams()
m.recvPool = drpcstream.NewBufferPool()

// set the internal stream options
drpcopts.SetStreamTransport(&m.opts.Stream.Internal, m.tr)
Expand Down Expand Up @@ -268,7 +270,7 @@ func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKin
drpcopts.SetStreamStats(&opts.Internal, cb(rpc))
}

stream := drpcstream.NewWithOptions(ctx, sid, m.wr, opts)
stream := drpcstream.NewWithOptions(ctx, sid, m.wr, m.recvPool, opts)

if err := m.streams.Add(sid, stream); err != nil {
return nil, err
Expand Down
42 changes: 42 additions & 0 deletions drpcstream/buffer_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (C) 2026 Cockroach Labs.
// See LICENSE for copying information.

package drpcstream

import "sync"

// BufferPool wraps sync.Pool to provide reusable byte slices for the
// stream receive path. Buffers obtained via Get should be returned via
// Put when no longer needed. Forgetting to Put is safe (GC reclaims)
// but reduces reuse.
type BufferPool struct {
pool sync.Pool
}

// NewBufferPool returns a new buffer pool.
func NewBufferPool() *BufferPool {
return &BufferPool{
pool: sync.Pool{
New: func() interface{} {
b := make([]byte, 0, 4096)
return &b
},
},
}
}

// Get returns a zero-length byte slice from the pool, retaining its
// backing array for reuse.
func (bp *BufferPool) Get() *[]byte {
p := bp.pool.Get().(*[]byte)
*p = (*p)[:0]
return p
}

// Put returns a buffer to the pool. Nil is safe to pass.
func (bp *BufferPool) Put(b *[]byte) {
if b == nil {
return
}
bp.pool.Put(b)
}
71 changes: 30 additions & 41 deletions drpcstream/ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ const defaultRingBufferCapacity = 256

// ringBuffer is a bounded single-producer / single-consumer FIFO queue for
// assembled packet data. It sits between manageReader (producer, calls
// Enqueue) and the application goroutine (consumer, calls Dequeue/Done).
// Enqueue) and the application goroutine (consumer, calls Dequeue).
//
// Slots are pre-allocated and reused: each slot's backing array grows via
// append to fit incoming data, then stays at its high-water mark, avoiding
// per-message allocation in steady state.
// Buffers are obtained from a shared BufferPool. Enqueue copies data into a
// pooled buffer; Dequeue returns ownership of that buffer to the caller and
// advances the tail immediately. The caller is responsible for returning the
// buffer to the pool via BufferPool.Put.
//
// After Close, Dequeue drains any queued messages before returning the close
// error. This ensures graceful shutdown (KindClose/KindCloseSend) delivers
Expand All @@ -28,23 +29,24 @@ type ringBuffer struct {
mu sync.Mutex
cond sync.Cond

buf [][]byte // ring of byte slices
head int // next write position (producer)
tail int // next read position (consumer)
count int // number of occupied slots
pool *BufferPool // shared pool; nil means allocate fresh each time
buf []*[]byte // ring of pooled buffer pointers
head int // next write position (producer)
tail int // next read position (consumer)
count int // number of occupied slots

held bool // true between Dequeue and Done
err error // terminal error, set by Close
err error // terminal error, set by Close
}

func (rb *ringBuffer) init() {
func (rb *ringBuffer) init(pool *BufferPool) {
rb.cond.L = &rb.mu
rb.buf = make([][]byte, defaultRingBufferCapacity)
rb.pool = pool
rb.buf = make([]*[]byte, defaultRingBufferCapacity)
}

// Enqueue copies data into the next write slot. If the buffer is full, it
// blocks until a slot is freed or the buffer is closed. If the buffer is
// closed, Enqueue returns silently without enqueuing.
// Enqueue copies data into a pooled buffer and places it in the next write
// slot. If the buffer is full, it blocks until a slot is freed or the buffer
// is closed. If the buffer is closed, Enqueue returns silently.
func (rb *ringBuffer) Enqueue(data []byte) {
rb.mu.Lock()
defer rb.mu.Unlock()
Expand All @@ -56,16 +58,19 @@ func (rb *ringBuffer) Enqueue(data []byte) {
return
}

rb.buf[rb.head] = append(rb.buf[rb.head][:0], data...)
b := rb.pool.Get()
*b = append(*b, data...)

rb.buf[rb.head] = b
rb.head = (rb.head + 1) % len(rb.buf)
rb.count++
rb.cond.Broadcast()
}

// Dequeue returns the data from the next read slot. If the buffer is empty,
// it blocks until data is available or the buffer is closed. The returned
// slice is valid until Done is called.
func (rb *ringBuffer) Dequeue() ([]byte, error) {
// Dequeue returns the next buffered message. The returned *[]byte is owned
// by the caller; the tail is advanced immediately. If the ring buffer has a
// pool, the caller should return the buffer via BufferPool.Put when done.
func (rb *ringBuffer) Dequeue() (*[]byte, error) {
rb.mu.Lock()
defer rb.mu.Unlock()

Expand All @@ -76,37 +81,21 @@ func (rb *ringBuffer) Dequeue() ([]byte, error) {
return nil, rb.err
}

rb.held = true
return rb.buf[rb.tail], nil
}

// Done advances the read pointer, making the slot available for reuse.
// It must be called exactly once after each successful Dequeue.
//
// TODO(shubham): remove this method once a shared buffer pool is introduced.
// With a pool, Dequeue will advance the tail immediately and the caller will
// return the buffer to the pool directly.
func (rb *ringBuffer) Done() {
rb.mu.Lock()
defer rb.mu.Unlock()

b := rb.buf[rb.tail]
rb.buf[rb.tail] = nil
rb.tail = (rb.tail + 1) % len(rb.buf)
rb.count--
rb.held = false
rb.cond.Broadcast()

return b, nil
}

// Close marks the buffer as closed with the given error. All blocked Enqueue
// and Dequeue calls are woken and will return. Close waits for any in-progress
// Dequeue/Done pair to complete before setting the error. Subsequent calls are
// no-ops.
// and Dequeue calls are woken and will return. Subsequent calls are no-ops.
func (rb *ringBuffer) Close(err error) {
rb.mu.Lock()
defer rb.mu.Unlock()

for rb.held {
rb.cond.Wait()
}
if rb.err != nil {
return
}
Expand Down
Loading