-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.go
More file actions
253 lines (219 loc) · 6.58 KB
/
utils.go
File metadata and controls
253 lines (219 loc) · 6.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package anet
import (
"encoding/binary"
"errors"
"fmt"
"io"
"net"
)
// Message framing constants.
const (
// LENGTHSIZE defines the size of the length header in bytes.
// The length header is a big-endian uint16 value.
LENGTHSIZE = 2
)
// Common framing errors.
var (
// ErrInvalidMsgLength indicates that a message length header is invalid.
ErrInvalidMsgLength = errors.New("invalid message length")
// ErrMaxLenExceeded indicates the message length exceeds the maximum allowed.
// This is determined by the maximum value that can be stored in LENGTHSIZE bytes.
ErrMaxLenExceeded = errors.New("maximum message length exceeded")
)
// RingBuffer is a fixed-size circular buffer for items of any type.
type RingBuffer[T any] struct {
buf []T // underlying buffer array.
mask uint64 // mask for index wrapping.
head uint64 // next position to read from.
tail uint64 // next position to write to.
}
// nextPow2Uint64 returns the smallest power of two >= v with a minimum of 1.
func nextPow2Uint64(v uint64) uint64 {
if v == 0 {
return 1
}
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v |= v >> 32
return v + 1
}
// NewRingBuffer creates a new RingBuffer with capacity rounded up to a power of two.
func NewRingBuffer[T any](size uint64) *RingBuffer[T] {
capacity := nextPow2Uint64(size)
return &RingBuffer[T]{
buf: make([]T, capacity),
mask: capacity - 1,
}
}
// Enqueue adds an item to the buffer. It returns false if the buffer is full.
func (r *RingBuffer[T]) Enqueue(item T) bool {
if (r.tail - r.head) == uint64(len(r.buf)) {
return false // buffer is full.
}
r.buf[r.tail&r.mask] = item
r.tail++
return true
}
// Dequeue removes and returns an item. It returns false if the buffer is empty.
func (r *RingBuffer[T]) Dequeue() (T, bool) {
var zero T
if r.tail == r.head {
return zero, false // buffer is empty.
}
item := r.buf[r.head&r.mask]
r.head++
return item, true
}
// Len returns the number of items in the buffer.
func (r *RingBuffer[T]) Len() uint64 {
return r.tail - r.head
}
// Cap returns the capacity of the buffer.
func (r *RingBuffer[T]) Cap() uint64 {
return uint64(len(r.buf))
}
// Write sends data over a connection using the message framing protocol.
// It prepends a big-endian length header of LENGTHSIZE bytes to the data.
// The maximum message size is determined by LENGTHSIZE (65535 bytes for uint16).
//
// The complete frame format is:
//
// [length header (2 bytes)][payload (length bytes)]
//
// Returns ErrMaxLenExceeded if the message is too large for the length header.
func Write(w io.Writer, in []byte) error {
// Calculate maximum allowed length based on header size.
maxLen := uint64(1<<(8*LENGTHSIZE)) - 1
if uint64(len(in)) > maxLen {
return ErrMaxLenExceeded
}
// Optimization: For small messages, copy to a single buffer to avoid
// the allocation overhead of net.Buffers (which allocates a slice).
// Threshold chosen empirically; copying < 512 bytes is cheaper than slice alloc + GC.
if len(in) < 512 {
totalLen := LENGTHSIZE + len(in)
buf := GetBuffer(totalLen)
// Ensure we return the buffer to the pool
defer PutBuffer(buf)
// Reslice to exact length
if cap(buf) >= totalLen {
buf = buf[:totalLen]
} else {
// Should not happen with GetBuffer, but safe fallback
buf = make([]byte, totalLen)
}
switch LENGTHSIZE {
case 2:
binary.BigEndian.PutUint16(buf[0:2], uint16(len(in)))
case 4:
binary.BigEndian.PutUint32(buf[0:4], uint32(len(in)))
}
copy(buf[LENGTHSIZE:], in)
_, err := w.Write(buf)
return err
}
// Build header in a small stack buffer to avoid allocations.
var hdr [LENGTHSIZE]byte
switch LENGTHSIZE {
case 2:
binary.BigEndian.PutUint16(hdr[:], uint16(len(in)))
case 4:
binary.BigEndian.PutUint32(hdr[:], uint32(len(in)))
default:
return fmt.Errorf("unsupported header size: %d", LENGTHSIZE)
}
// Attempt a vectorized write using net.Buffers to avoid copying payload.
// Falls back internally to sequential writes if writev is unavailable.
bufs := net.Buffers{hdr[:], in}
n, err := bufs.WriteTo(w)
if err != nil {
return err
}
expected := int64(len(hdr) + len(in))
if n != expected {
return io.ErrShortWrite
}
return nil
}
// Read receives data from a connection using the message framing protocol.
// It first reads a big-endian length header of LENGTHSIZE bytes, then reads
// the specified number of payload bytes.
//
// The complete frame format is:
//
// [length header (2 bytes)][payload (length bytes)]
//
// Returns:
// - The payload data.
// - io.EOF if the connection was closed cleanly.
// - Other errors for network or protocol issues.
func Read(r io.Reader) ([]byte, error) {
// Read the length header into a small stack buffer.
var hdr [LENGTHSIZE]byte
if _, err := io.ReadFull(r, hdr[:]); err != nil {
return nil, err
}
// Parse the length value.
var length uint64
switch LENGTHSIZE {
case 2:
length = uint64(binary.BigEndian.Uint16(hdr[:]))
case 4:
length = uint64(binary.BigEndian.Uint32(hdr[:]))
default:
return nil, fmt.Errorf("unsupported header size: %d", LENGTHSIZE)
}
// Allocate the exact-sized payload buffer and read directly into it.
if length == 0 {
return []byte{}, nil
}
payload := make([]byte, length)
if _, err := io.ReadFull(r, payload); err != nil {
return nil, err
}
return payload, nil
}
// ReadPooled receives data using the message framing protocol, allocating the
// payload buffer from the global buffer pool. The caller is responsible for
// returning the buffer to the pool using PutBuffer.
func ReadPooled(r io.Reader) ([]byte, error) {
// Read the length header into a small stack buffer.
var hdr [LENGTHSIZE]byte
if _, err := io.ReadFull(r, hdr[:]); err != nil {
return nil, err
}
// Parse the length value.
var length uint64
switch LENGTHSIZE {
case 2:
length = uint64(binary.BigEndian.Uint16(hdr[:]))
case 4:
length = uint64(binary.BigEndian.Uint32(hdr[:]))
default:
return nil, fmt.Errorf("unsupported header size: %d", LENGTHSIZE)
}
if length == 0 {
return []byte{}, nil
}
// Get a buffer from the pool.
// Note: GetBuffer returns a slice with len=size if allocated new,
// or len=poolSize if from pool. We need to ensure we read exactly 'length'.
// But GetBuffer guarantees cap >= size.
buf := GetBuffer(int(length))
// Reslice to exact length needed for ReadFull
if cap(buf) < int(length) {
// Should not happen if GetBuffer works correctly
buf = make([]byte, length)
}
payload := buf[:length]
if _, err := io.ReadFull(r, payload); err != nil {
// Return buffer to pool on error if we managed to get one
PutBuffer(buf)
return nil, err
}
return payload, nil
}