-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgroup.go
More file actions
407 lines (342 loc) · 14.7 KB
/
group.go
File metadata and controls
407 lines (342 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
/*
Copyright 2026 The ARCORIS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bufferpool
import (
"sync"
"sync/atomic"
)
const (
// errNilPoolGroup reports nil or zero-value PoolGroup receivers.
errNilPoolGroup = "bufferpool.PoolGroup: receiver must not be nil"
// errGroupPartitionMissing reports lookup failure for a group-owned partition.
errGroupPartitionMissing = "bufferpool.PoolGroup: partition not found"
// errGroupPoolMissing reports lookup failure for a group-owned Pool.
errGroupPoolMissing = "bufferpool.PoolGroup: pool not found"
// errGroupLeasePoolMissing reports a lease that does not belong to the group.
errGroupLeasePoolMissing = "bufferpool.PoolGroup: lease pool not found"
)
// PoolGroup owns a deterministic set of PoolPartitions, a group-global Pool
// directory, and a manual group-level coordinator boundary.
//
// PoolGroup is the first owner above PoolPartition. It routes managed
// acquisition and release by Pool name through the owning partition while
// aggregating partition samples, bounded windows, rates, metrics, snapshots,
// and foreground coordinator reports. TickInto may publish partition retained
// budgets into owned PoolPartitions and SetPressure propagates pressure signals
// to partitions. The optional coordinator scheduler is owner-local and calls the
// same TickInto path as manual callers; it does not tick partitions
// automatically, duplicate coordinator logic, retain full reports, or involve
// Pool.Get/Pool.Put. PoolGroup deliberately does not execute physical trim, scan
// shards directly, or compute class EWMA.
//
// Responsibility boundary:
//
// - group.go owns the PoolGroup type, construction, metadata accessors, and
// receiver validation;
// - group_config.go owns construction config normalization and validation;
// - group_policy.go owns group control policy;
// - group_lifecycle.go owns hard close behavior;
// - group_registry.go owns the immutable partition registry;
// - group_runtime.go owns group runtime policy snapshots;
// - group_sample.go owns group sampling and aggregation;
// - group_window.go owns bounded group windows;
// - group_rate.go owns aggregate rate projection;
// - group_score*.go owns score value projection;
// - group_snapshot.go and group_metrics.go own diagnostics;
// - group_coordinator*.go and group_coordinator_report.go own foreground
// coordinator state, budget publication, and reports.
//
// Copying:
//
// PoolGroup MUST NOT be copied after first use. It embeds atomic lifecycle and
// generation state and owns partition pointers through an immutable registry.
type PoolGroup struct {
// lifecycle gates group-level foreground work and hard close.
lifecycle AtomicLifecycle
// runtimeMu serializes group-routed foreground operations with hard close:
// Acquire, AcquireSize, Release, and TickInto take the read side; Close takes
// the write side. It is not a Pool hot-path lock, is not taken by direct
// PoolPartition or Pool users, and does not make diagnostics transactional.
// The lock protects only the group ownership boundary so Close cannot close
// child partitions while group-routed work is inside that boundary.
runtimeMu sync.RWMutex
// generation tracks group-visible state and explicit coordinator events.
generation AtomicGeneration
// name is diagnostic group metadata.
name string
// config is the normalized construction config kept for diagnostics.
config PoolGroupConfig
// runtimeSnapshot publishes immutable group policy views.
runtimeSnapshot atomic.Pointer[groupRuntimeSnapshot]
// registry owns the deterministic set of group partitions.
registry groupRegistry
// poolDirectory maps group-global Pool names and owned Pool pointers to
// immutable partition locations.
poolDirectory groupPoolDirectory
// scoreEvaluator owns prepared score adapters for group evaluation.
scoreEvaluator PoolGroupScoreEvaluator
// coordinator owns applied group-local state mutated only by TickInto.
coordinator groupCoordinator
// coordinatorScheduler owns the optional background dispatch loop for
// PoolGroupPolicy.Coordinator.Enabled. It is separate from groupCoordinator
// because Close owns group lifecycle and must stop scheduled ticks before
// child partitions are closed.
coordinatorScheduler controllerSchedulerRuntime
// schedulerReconfigMu serializes PublishPolicy scheduler enable, disable,
// retime, and Close shutdown. TickInto never takes this lock, so waiting for
// scheduler Stop cannot block behind itself through runtimeMu.RLock.
schedulerReconfigMu sync.Mutex
// coordinatorSchedulerTickerFactory is construction/test wiring reused by
// live PublishPolicy reconfiguration. A nil factory means production
// time.NewTicker-backed scheduling.
coordinatorSchedulerTickerFactory controllerSchedulerTickerFactory
}
// NewPoolGroup constructs and activates a PoolGroup.
//
// NewPoolGroup constructs owned PoolPartitions from explicit partitions or from
// group-level Pool assignments. It starts background work only when the group
// policy explicitly enables the opt-in coordinator scheduler. Runtime partition
// budget publication otherwise happens only when callers explicitly invoke
// TickInto/Tick.
func NewPoolGroup(config PoolGroupConfig) (*PoolGroup, error) {
return newPoolGroup(config, nil)
}
// newPoolGroupWithCoordinatorSchedulerTickerFactory constructs a group with a
// test-supplied scheduler ticker factory.
//
// The helper is intentionally unexported so production callers cannot bypass
// policy validation or own scheduler wiring. Tests use it to drive opt-in group
// scheduling deterministically without real timers or sleeps.
func newPoolGroupWithCoordinatorSchedulerTickerFactory(
config PoolGroupConfig,
tickerFactory controllerSchedulerTickerFactory,
) (*PoolGroup, error) {
return newPoolGroup(config, tickerFactory)
}
// newPoolGroup contains the shared construction path for public and
// deterministic-test constructors.
//
// Scheduler startup is deliberately last: partition registry, pool directory,
// runtime snapshot, coordinator state, and lifecycle activation must all be
// complete before a scheduled TickInto can observe the group. If startup fails,
// the already-created group is closed through the normal hard-close path.
func newPoolGroup(config PoolGroupConfig, tickerFactory controllerSchedulerTickerFactory) (*PoolGroup, error) {
normalized := config.Normalize()
if err := normalized.Validate(); err != nil {
return nil, err
}
partitions, directory, err := newGroupPartitionAssignments(normalized)
if err != nil {
return nil, err
}
registry, err := newGroupRegistry(partitions)
if err != nil {
return nil, err
}
directory, err = directory.bindRegistry(registry)
if err != nil {
_ = registry.closeAll()
return nil, err
}
normalized.Partitions = partitions
normalized.Pools = nil
group := &PoolGroup{
name: normalized.Name,
config: cloneGroupConfig(normalized),
registry: registry,
poolDirectory: directory,
scoreEvaluator: NewPoolGroupScoreEvaluator(normalized.Policy.Score),
coordinatorSchedulerTickerFactory: tickerFactory,
}
group.coordinator.init(normalized.Policy)
group.generation.Store(InitialGeneration)
group.publishRuntimeSnapshot(newGroupRuntimeSnapshot(InitialGeneration, normalized.Policy))
group.lifecycle.Activate()
if err := group.startCoordinatorScheduler(tickerFactory); err != nil {
_ = group.Close()
return nil, err
}
return group, nil
}
// MustNewPoolGroup constructs a PoolGroup and panics on failure.
func MustNewPoolGroup(config PoolGroupConfig) *PoolGroup {
group, err := NewPoolGroup(config)
if err != nil {
panic(err)
}
return group
}
// Name returns the diagnostic group name.
func (g *PoolGroup) Name() string { g.mustBeInitialized(); return g.name }
// Config returns a defensive copy of the normalized construction config.
func (g *PoolGroup) Config() PoolGroupConfig {
g.mustBeInitialized()
return cloneGroupConfig(g.config)
}
// Policy returns the currently published group policy.
func (g *PoolGroup) Policy() PoolGroupPolicy {
g.mustBeInitialized()
return g.currentRuntimeSnapshot().Policy
}
// PartitionNames returns the deterministic group-local partition names.
func (g *PoolGroup) PartitionNames() []string {
g.mustBeInitialized()
return g.registry.namesCopy()
}
// PoolNames returns the deterministic group-global Pool names.
func (g *PoolGroup) PoolNames() []string {
g.mustBeInitialized()
return g.poolDirectory.namesCopy()
}
// PartitionSnapshot returns a diagnostic snapshot for one group-owned partition.
//
// PartitionSnapshot is diagnostic and remains available after group close. It
// does not expose the raw partition pointer and does not authorize data-plane
// work after shutdown begins.
func (g *PoolGroup) PartitionSnapshot(name string) (PoolPartitionSnapshot, bool) {
g.mustBeInitialized()
partition, ok := g.registry.partition(name)
if !ok {
return PoolPartitionSnapshot{}, false
}
return partition.Snapshot(), true
}
// PartitionMetrics returns diagnostic metrics for one group-owned partition.
//
// PartitionMetrics is diagnostic and remains available after group close. It
// reads through the owned PoolPartition without exposing it.
func (g *PoolGroup) PartitionMetrics(name string) (PoolPartitionMetrics, bool) {
g.mustBeInitialized()
partition, ok := g.registry.partition(name)
if !ok {
return PoolPartitionMetrics{}, false
}
return partition.Metrics(), true
}
// Acquire obtains an ownership lease through the group pool directory.
//
// Acquire validates the group lifecycle, resolves the group-global Pool name to
// its owning PoolPartition, and delegates to PoolPartition.Acquire so
// checked-out ownership remains in the partition-owned LeaseRegistry. It does
// not expose raw *PoolPartition or *Pool access, and it returns ErrClosed once
// group close has started.
func (g *PoolGroup) Acquire(poolName string, size int) (Lease, error) {
g.mustBeInitialized()
g.runtimeMu.RLock()
defer g.runtimeMu.RUnlock()
if !g.lifecycle.AllowsWork() {
return Lease{}, newError(ErrClosed, errGroupClosed)
}
location, ok := g.poolDirectory.location(poolName)
if !ok {
return Lease{}, newError(ErrInvalidOptions, errGroupPoolMissing+": "+poolName)
}
partition, _ := g.registry.partition(location.PartitionName)
return partition.Acquire(location.PoolName, size)
}
// AcquireSize is the Size-typed form of Acquire.
//
// The lifecycle, pool-directory lookup, and LeaseRegistry ownership semantics
// are identical to Acquire.
func (g *PoolGroup) AcquireSize(poolName string, size Size) (Lease, error) {
g.mustBeInitialized()
g.runtimeMu.RLock()
defer g.runtimeMu.RUnlock()
if !g.lifecycle.AllowsWork() {
return Lease{}, newError(ErrClosed, errGroupClosed)
}
location, ok := g.poolDirectory.location(poolName)
if !ok {
return Lease{}, newError(ErrInvalidOptions, errGroupPoolMissing+": "+poolName)
}
partition, _ := g.registry.partition(location.PartitionName)
return partition.AcquireSize(location.PoolName, size)
}
// Release releases a group-acquired lease through its owning partition.
//
// Release intentionally routes to PoolPartition.Release instead of calling
// Lease.Release directly. That preserves partition dirty marking and keeps the
// LeaseRegistry ownership boundary partition-local. Release remains available
// after group close starts so checked-out ownership can complete diagnostically,
// matching PoolPartition release semantics.
func (g *PoolGroup) Release(lease Lease, buffer []byte) error {
g.mustBeInitialized()
g.runtimeMu.RLock()
defer g.runtimeMu.RUnlock()
location, ok := g.poolDirectory.locationForLease(lease)
if !ok {
return newError(ErrInvalidLease, errGroupLeasePoolMissing)
}
partition, _ := g.registry.partition(location.PartitionName)
return partition.Release(lease, buffer)
}
// AcquireFromPartition obtains a lease from a specific group-local partition.
//
// This is an advanced routing method for explicit partition-aware callers.
// Ordinary managed callers should use Acquire with a group-global Pool name.
func (g *PoolGroup) AcquireFromPartition(partitionName string, poolName string, size int) (Lease, error) {
g.mustBeInitialized()
g.runtimeMu.RLock()
defer g.runtimeMu.RUnlock()
if !g.lifecycle.AllowsWork() {
return Lease{}, newError(ErrClosed, errGroupClosed)
}
partition, ok := g.registry.partition(partitionName)
if !ok {
return Lease{}, newError(ErrInvalidOptions, errGroupPartitionMissing+": "+partitionName)
}
return partition.Acquire(poolName, size)
}
// AcquireSizeFromPartition is the Size-typed form of AcquireFromPartition.
func (g *PoolGroup) AcquireSizeFromPartition(partitionName string, poolName string, size Size) (Lease, error) {
g.mustBeInitialized()
g.runtimeMu.RLock()
defer g.runtimeMu.RUnlock()
if !g.lifecycle.AllowsWork() {
return Lease{}, newError(ErrClosed, errGroupClosed)
}
partition, ok := g.registry.partition(partitionName)
if !ok {
return Lease{}, newError(ErrInvalidOptions, errGroupPartitionMissing+": "+partitionName)
}
return partition.AcquireSize(poolName, size)
}
// ReleaseToPartition releases a lease through a specific group-local partition.
//
// This advanced method is useful for tests and partition-aware integrations.
// It still delegates to PoolPartition.Release and does not duplicate lease
// ownership validation.
func (g *PoolGroup) ReleaseToPartition(partitionName string, lease Lease, buffer []byte) error {
g.mustBeInitialized()
g.runtimeMu.RLock()
defer g.runtimeMu.RUnlock()
partition, ok := g.registry.partition(partitionName)
if !ok {
return newError(ErrInvalidOptions, errGroupPartitionMissing+": "+partitionName)
}
return partition.Release(lease, buffer)
}
// partition returns a raw group-owned partition for internal group code only.
//
// This helper must stay unexported. Public callers route data-plane work
// through PoolGroup Acquire/Release and diagnostics through snapshot/metrics
// methods so partition ownership and lease accounting remain intact.
func (g *PoolGroup) partition(name string) (*PoolPartition, bool) {
return g.registry.partition(name)
}
// mustBeInitialized verifies that g was constructed by NewPoolGroup.
func (g *PoolGroup) mustBeInitialized() {
if g == nil || g.runtimeSnapshot.Load() == nil || g.scoreEvaluator.isZero() {
panic(errNilPoolGroup)
}
}