From 14db7084c2aae679a79af600ddb9b66daa453797 Mon Sep 17 00:00:00 2001 From: Yoann Ghigoff Date: Thu, 2 Apr 2026 16:26:58 +0200 Subject: [PATCH] Add RT scheduling support for ring buffer reader goroutine Add SchedPolicy and SchedPriority fields to RingBufferOptions that allow callers to request a realtime scheduling policy for the ring buffer reader goroutine's OS thread. When SchedPolicy is set, the reader goroutine pins itself to an OS thread via LockOSThread and calls sched_setattr(2) to apply the requested realtime policy and priority. On failure, the error is sent to ErrChan and the reader continues with default scheduling (graceful degradation). This reduces eBPF ring buffer event loss under heavy syscall load by giving the reader thread priority over normal workloads. --- ringbuffer.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/ringbuffer.go b/ringbuffer.go index 89713f3..801c3e1 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -3,11 +3,13 @@ package manager import ( "errors" "fmt" + "runtime" "sync" "sync/atomic" "github.com/cilium/ebpf" "github.com/cilium/ebpf/ringbuf" + "golang.org/x/sys/unix" ) type RingBufferOptions struct { @@ -27,6 +29,14 @@ type RingBufferOptions struct { // TelemetryEnabled turns on telemetry about the usage of the ring buffer TelemetryEnabled bool + + // SchedPolicy - If set, the reader goroutine's OS thread will be pinned + // (LockOSThread) and given this scheduling policy via sched_setattr(2). + // Supported values: unix.SCHED_FIFO, unix.SCHED_RR. Zero means no change. + SchedPolicy int + + // SchedPriority - RT scheduling priority (1-99). Required when SchedPolicy is set. + SchedPriority int } type RingBuffer struct { @@ -74,6 +84,15 @@ func (rb *RingBuffer) init(manager *Manager) error { return fmt.Errorf("no DataHandler/RecordHandler set for %s", rb.Name) } + if rb.SchedPolicy != 0 { + if rb.SchedPolicy != unix.SCHED_FIFO && rb.SchedPolicy != unix.SCHED_RR { + return fmt.Errorf("unsupported SchedPolicy %d for %s: must be SCHED_FIFO or SCHED_RR", rb.SchedPolicy, rb.Name) + } + if rb.SchedPriority < 1 || rb.SchedPriority > 99 { + return fmt.Errorf("SchedPriority must be between 1 and 99 for %s, got %d", rb.Name, rb.SchedPriority) + } + } + if rb.TelemetryEnabled { rb.usageTelemetry = &atomic.Uint64{} } @@ -106,6 +125,24 @@ func (rb *RingBuffer) Start() error { rb.wgReader.Add(1) go func() { + if rb.SchedPolicy != 0 { + runtime.LockOSThread() + // No UnlockOSThread — the goroutine runs for the ring buffer's + // lifetime and the thread is destroyed when it exits. + attr, err := unix.SchedGetAttr(0, 0) + if err == nil { + attr.Policy = uint32(rb.SchedPolicy) + attr.Priority = uint32(rb.SchedPriority) + err = unix.SchedSetAttr(0, attr, 0) + } + if err != nil { + if rb.ErrChan != nil { + rb.ErrChan <- fmt.Errorf("failed to set RT scheduling policy on ring buffer reader: %w", err) + } + // Continue with normal scheduling (graceful degradation) + } + } + var record *ringbuf.Record var err error