-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.go
More file actions
168 lines (144 loc) · 4.75 KB
/
run.go
File metadata and controls
168 lines (144 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package workers
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/go-coldbrew/log"
"github.com/go-coldbrew/tracing"
"github.com/thejerf/suture/v4"
)
// RunOption configures the behavior of Run.
type RunOption func(*runConfig)
type runConfig struct {
metrics Metrics
}
// WithMetrics sets the metrics implementation for all workers started by Run.
// Workers inherit this unless they override via Worker.WithMetrics.
// If not set, BaseMetrics{} is used.
func WithMetrics(m Metrics) RunOption {
return func(c *runConfig) {
if m != nil {
c.metrics = m
}
}
}
// workerRunService wraps the actual Run func as a suture.Service
// that runs inside the worker's own child supervisor.
type workerRunService struct {
w *Worker
childSup *suture.Supervisor
metrics Metrics
active *atomic.Int32
mu sync.Mutex
attempt int
}
// Serve implements suture.Service.
func (ws *workerRunService) Serve(ctx context.Context) error {
ws.mu.Lock()
attempt := ws.attempt
ws.attempt++
ws.mu.Unlock()
m := ws.metrics
m.WorkerStarted(ws.w.name)
ws.active.Add(1)
m.SetActiveWorkers(int(ws.active.Load()))
start := time.Now()
defer func() {
m.WorkerStopped(ws.w.name)
ws.active.Add(-1)
m.SetActiveWorkers(int(ws.active.Load()))
m.ObserveRunDuration(ws.w.name, time.Since(start))
}()
if attempt > 0 {
m.WorkerRestarted(ws.w.name, attempt)
}
span, ctx := tracing.NewInternalSpan(ctx, "worker:"+ws.w.name)
defer span.Finish()
// Inject worker name and attempt into log context so all log calls
// inside the worker automatically include them.
ctx = log.AddToContext(ctx, "worker", ws.w.name)
ctx = log.AddToContext(ctx, "attempt", attempt)
wctx := newWorkerContext(ctx, ws.w.name, attempt, ws.childSup, m, ws.active)
err := ws.w.run(wctx)
if err != nil && ctx.Err() == nil {
m.WorkerFailed(ws.w.name, err)
}
if !ws.w.restartOnFail && (err == nil || ctx.Err() != nil) {
return suture.ErrDoNotRestart
}
return err
}
// String implements fmt.Stringer for suture logging.
func (ws *workerRunService) String() string {
return ws.w.name
}
// resolveMetrics returns the worker's own metrics if set, otherwise the parent's.
func resolveMetrics(w *Worker, parent Metrics) Metrics {
if w.metrics != nil {
return w.metrics
}
if parent != nil {
return parent
}
return BaseMetrics{}
}
// addWorkerToSupervisor creates a child supervisor for the worker,
// adds the worker's run func as a service inside it, and adds the
// child supervisor to the parent. Returns the service token for removal.
func addWorkerToSupervisor(parent *suture.Supervisor, w *Worker, metrics Metrics, active *atomic.Int32) suture.ServiceToken {
m := resolveMetrics(w, metrics)
childSup := suture.New("worker:"+w.name, w.sutureSpec(makeEventHook(m)))
childSup.Add(&workerRunService{w: w, childSup: childSup, metrics: m, active: active})
return parent.Add(childSup)
}
// Run starts all workers under a suture supervisor and blocks until ctx is
// cancelled and all workers have exited. Each worker gets its own child
// supervisor — when a worker stops, its children stop too.
// A worker exiting early (without restart) does not stop other workers.
// Returns nil on clean shutdown.
func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error {
cfg := &runConfig{metrics: BaseMetrics{}}
for _, opt := range opts {
opt(cfg)
}
active := &atomic.Int32{}
root := suture.New("workers", suture.Spec{
EventHook: makeEventHook(cfg.metrics),
})
for _, w := range workers {
addWorkerToSupervisor(root, w, cfg.metrics, active)
}
err := root.Serve(ctx)
if err != nil && ctx.Err() != nil {
return nil
}
return err
}
// RunWorker runs a single worker with panic recovery and optional restart.
// Blocks until ctx is cancelled or the worker exits without RestartOnFail.
func RunWorker(ctx context.Context, w *Worker, opts ...RunOption) {
_ = Run(ctx, []*Worker{w}, opts...)
}
// makeEventHook returns a suture event hook that logs events and records
// panic metrics.
func makeEventHook(m Metrics) suture.EventHook {
return func(e suture.Event) {
ctx := context.Background()
em := e.Map()
switch e.Type() {
case suture.EventTypeServicePanic:
name, _ := em["service_name"].(string)
m.WorkerPanicked(name)
log.Error(ctx, "msg", "worker panicked", "worker", em["service_name"], "event", e.String())
case suture.EventTypeServiceTerminate:
log.Warn(ctx, "msg", "worker terminated", "worker", em["service_name"], "event", e.String())
case suture.EventTypeBackoff:
log.Warn(ctx, "msg", "worker backoff", "event", e.String())
case suture.EventTypeResume:
log.Info(ctx, "msg", "worker resumed", "event", e.String())
case suture.EventTypeStopTimeout:
log.Error(ctx, "msg", "worker stop timeout", "worker", em["service_name"], "event", e.String())
}
}
}