-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask.go
More file actions
510 lines (459 loc) · 11.9 KB
/
task.go
File metadata and controls
510 lines (459 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
package coco
import (
"database/sql/driver"
"encoding/json"
"fmt"
"strconv"
"strings"
"github.com/imagvfx/coco/service"
)
// TaskStatus is a task status.
type TaskStatus int
const (
TaskWaiting = TaskStatus(iota)
TaskRunning
TaskFailed
TaskDone
)
// String represents TaskStatus as string.
func (s TaskStatus) String() string {
return map[TaskStatus]string{
TaskWaiting: "waiting",
TaskRunning: "running",
TaskFailed: "failed",
TaskDone: "done",
}[s]
}
// branchStat calculates the branch's status with Status.
type branchStat struct {
// n indicates total number of leaves in this branch.
// It should not be changed.
n int
nFailed int
nRunning int
nWaiting int
nDone int
}
// newBranchStat creates a new branchStat, that is having n leafs.
func newBranchStat(n int) *branchStat {
return &branchStat{n: n, nWaiting: n}
}
func (st *branchStat) N() int {
return st.n
}
// Change changes a leaf child's TaskStatus to another.
func (st *branchStat) Change(from, to TaskStatus) {
switch from {
case TaskFailed:
st.nFailed -= 1
case TaskRunning:
st.nRunning -= 1
case TaskWaiting:
st.nWaiting -= 1
case TaskDone:
st.nDone -= 1
default:
panic(fmt.Sprintf("unknown TaskStatus: to: %v", to))
}
// those are number of leaf status, make sure that isn't a nagative value.
if st.nFailed < 0 {
panic(fmt.Sprintf("nFailed shouldn't be a nagative value"))
}
if st.nRunning < 0 {
panic(fmt.Sprintf("nRunning shouldn't be a nagative value"))
}
if st.nWaiting < 0 {
panic(fmt.Sprintf("nWaiting shouldn't be a nagative value"))
}
if st.nDone < 0 {
panic(fmt.Sprintf("nDone shouldn't be a nagative value"))
}
switch to {
case TaskFailed:
st.nFailed += 1
case TaskRunning:
st.nRunning += 1
case TaskWaiting:
st.nWaiting += 1
case TaskDone:
st.nDone += 1
default:
panic(fmt.Sprintf("unknown TaskStatus: from: %v", from))
}
}
// Status calcuates the branch's status based on the leaf children's status.
func (st *branchStat) Status() TaskStatus {
if st.nFailed > 0 {
return TaskFailed
}
if st.nRunning > 0 {
return TaskRunning
}
if st.nWaiting > 0 {
return TaskWaiting
}
return TaskDone
}
// TaskID is a task id.
type TaskID [2]int
// String returns string representation of a task id.
func (id TaskID) String() string {
return strconv.Itoa(id[0]) + "-" + strconv.Itoa(id[1])
}
// Task has a command and/or subtasks that will be run by workers.
//
// Task having subtasks are called Branch.
// Others are called Leaf, which can have commands.
// Leaf not having commands are valid, but barely useful.
// A Task is either a Branch or a Leaf. It cannot be both at same time.
type Task struct {
// ID is an id of the task.
ID TaskID
// Job is a job the task is belong to.
Job *Job
// parent is a parent task of the task.
// It will be nil, if the task is a root task.
parent *Task
// nthChild indicates this task is nth child of the parent task.
nthChild int
// next is the next task for walking a job's tasks.
// When it is nil, the task is last task of the job.
next *Task
// Title is human readable title for task.
// Empty Title is allowed.
Title string
// Priority is a priority hint for the task.
// Higher values are take precedence to lower values.
// Priority set to 0 makes it inherit nearest parent that has non-zero priority.
// If there isn't non-zero priority parent, the task has priority 0.
// Negative values will be considered as a false value, and will be change to 0.
//
// NOTE: Use CalcPriority to get the real priority of the task.
Priority int
// Subtasks contains subtasks to be run.
// Subtasks could be nil or empty.
Subtasks []*Task
// When true, a subtask will be launched after the prior task finished.
// When false, a subtask will be launched right after the prior task started.
SerialSubtasks bool
// isLeaf indicates whether the task is a leaf task.
isLeaf bool
update func(service.TaskUpdater) error
// Commands are guaranteed that they run serially from a same worker.
Commands Commands
//
// NOTE:
//
// Fields below should be used/changed after hold the Job's lock.
//
//
// status indicates task status using in the farm.
// It should not be set from user.
status TaskStatus
// Stat aggrigates it's leafs status.
// It is only meaningful when the task is a branch.
Stat *branchStat
// popIdx shows which child of the task should be popped.
// It is -1, if the task is a leaf that has already run, or a branch popped all tasks.
popIdx int
// retry represents how many times the task retried automatically due to fail of the task.
// It will be reset, when user retries the job of the task.
// It is only meaningful to a leaf task.
retry int
// Assignee is a worker who is running the task's commands currently.
// It is nil except, the task is running.
// It is only meaningful to a leaf task.
Assignee string
}
// TaskIDFromString splits a task id string into job ID and task number.
// If the given string isn't valid, it will return an error as a third argument.
func TaskIDFromString(id string) (TaskID, error) {
toks := strings.Split(id, "-")
if len(toks) != 2 {
return TaskID{-1, -1}, fmt.Errorf("invalid task id: %v", id)
}
ord, err := strconv.Atoi(toks[0])
if err != nil {
return TaskID{-1, -1}, fmt.Errorf("invalid task id: %v", id)
}
if ord < 0 {
return TaskID{-1, -1}, fmt.Errorf("invalid task id: %v", id)
}
num, err := strconv.Atoi(toks[1])
if err != nil {
return TaskID{-1, -1}, fmt.Errorf("invalid task id: %v", id)
}
if num < 0 {
return TaskID{-1, -1}, fmt.Errorf("invalid task id: %v", id)
}
return TaskID{ord, num}, nil
}
// Commands are commands that are garuanteed to be run from a worker.
type Commands []Command
// Value implements driver.Valuer.
func (cs Commands) Value() (driver.Value, error) {
v, err := json.Marshal(cs)
if err != nil {
return nil, err
}
return v, nil
}
// Scan implements sql.Scanner.
func (cs *Commands) Scan(v interface{}) error {
if v == nil {
return fmt.Errorf("scan commands: nil")
}
b := v.([]byte)
err := json.Unmarshal(b, &cs)
if err != nil {
return err
}
return nil
}
// Command is a command to be run in a worker.
// First string is the executable and others are arguments.
// When a Command is nil or empty, the command will be skipped.
type Command []string
// MarshalJSON implements json.Marshaller.
func (t *Task) MarshalJSON() ([]byte, error) {
m := struct {
Title string
ID string
Status string
Priority int
Subtasks []*Task
SerialSubtasks bool
Commands Commands
}{
Title: t.Title,
ID: t.ID.String(),
Status: t.Status().String(),
Priority: t.Priority,
Subtasks: t.Subtasks,
SerialSubtasks: t.SerialSubtasks,
Commands: t.Commands,
}
return json.Marshal(m)
}
// ToSQL converts a Task into a SQLTask.
func (t *Task) ToSQL() *service.Task {
cmds, err := json.Marshal(t.Commands)
if err != nil {
// should not happen
panic(err)
}
s := &service.Task{
Job: t.ID[0],
Task: t.ID[1],
Title: t.Title,
Status: int(t.status),
SerialSubtasks: t.SerialSubtasks,
Commands: string(cmds),
}
s.ParentNum = -1
if t.parent != nil {
s.ParentNum = t.parent.ID[1]
}
return s
}
// FromSQL converts a SQLTask into a Task.
func (t *Task) FromSQL(st *service.Task) {
t.ID = TaskID{st.Job, st.Task}
t.Title = st.Title
t.status = TaskStatus(st.Status)
t.SerialSubtasks = st.SerialSubtasks
cmds := make(Commands, 0)
err := json.Unmarshal([]byte(st.Commands), &cmds)
if err != nil {
// should not happen
panic(err)
}
t.Commands = cmds
}
// Blocking returns a bool value that indicates whether the task is a blocking task.
// A serial task that didn't finished blocks the next task, and a failed task blocks the parent.
func (t *Task) Blocking() bool {
if !t.isLeaf {
panic("shouldn't call Task.Blocking on non-leaf task")
}
// A parallel leaf task will always return done == true.
// On the other hand a serial leaf task will return done == true,
// only when the task has really finished.
block := false
if t.parent.SerialSubtasks && t.status != TaskDone {
block = true
}
return block
}
// Pop pops a first child task that isn't popped yet.
// The second return value indicates that there is no more task to be popped. (done)
// It will return (nil, true) if no more child task is left.
// It will return (nil, false) if there is remaining tasks, but
// a task cannot be popped due to one of the prior subtask is blocking the process.
func (t *Task) Pop() (*Task, bool) {
if t.isLeaf {
block := t.Blocking()
if t.popIdx != -1 {
t.popIdx = -1
return t, !block
}
return nil, !block
}
// branch
if t.popIdx < 0 {
return nil, true
}
i := t.popIdx
var popt *Task
alldone := true // all done until the subtask
for i < len(t.Subtasks) {
subt := t.Subtasks[i]
p, done := subt.Pop()
popt = p
if done {
if alldone {
// caching the result for next pop
t.popIdx = i + 1
}
// this subtask has done, but one of the prior task hasn't done yet.
// cannot jump to this subtask.
} else {
alldone = false
if t.SerialSubtasks {
// should wait the subtask has done
break
}
}
if p != nil {
break
}
i++
}
if t.popIdx == len(t.Subtasks) {
t.popIdx = -1
}
return popt, t.popIdx == -1
}
// restorePopIdx restores the task and it's subtasks' popIdx
// from status of it's leaves.
// It is used for a job which just resurrected from db.
func (t *Task) restorePopIdx() {
if t.isLeaf {
if t.status == TaskWaiting {
t.popIdx = 0
return
}
t.popIdx = -1
return
}
// branch
for _, subt := range t.Subtasks {
subt.restorePopIdx()
}
t.popIdx = -1 // in case all subtasks are popped
for i, subt := range t.Subtasks {
if subt.isLeaf && subt.Blocking() {
t.popIdx = i
break
}
if subt.popIdx != -1 {
t.popIdx = i
break
}
}
}
// Peek peeks the next task that will be popped.
// It will be nil, if all the tasks are popped or
// a task cannot be popped due to one of the prior subtask is blocking the process.
func (t *Task) Peek() *Task {
if t.popIdx == -1 {
// t and it's subtasks has all done
return nil
}
popt := t
for !popt.isLeaf {
// There should be no popIdx == -1, if t hasn't done yet.
popt = popt.Subtasks[popt.popIdx]
}
if popt.Blocking() {
return nil
}
return popt
}
// Push pushes the task to it's job, so it can popped again.
// Before pushing a task, change it's status to TaskWaiting,
// or it will be just skipped when popped.
func (t *Task) Push() {
if !t.isLeaf {
panic("cannot push a branch task")
}
t.popIdx = 0
parent := t.parent
child := t
for parent != nil {
n := child.nthChild
if parent.popIdx == -1 || n < parent.popIdx {
parent.popIdx = n
}
child = parent
parent = parent.parent
}
}
// CanRetry checks the task reaches maximum retry count of the job.
func (t *Task) CanRetry() bool {
if t.retry >= t.Job.AutoRetry {
return false
}
return true
}
// Status returns the task's status.
// When it's a branch, it will be calculated from the childen's status.
func (t *Task) Status() TaskStatus {
if t.isLeaf {
return t.status
}
return t.Stat.Status()
}
// CalcPriority calculates the tasks prority.
func (t *Task) CalcPriority() int {
tt := t
for tt != nil {
p := tt.Priority
if p != 0 {
return p
}
tt = tt.parent
}
// the job's Priority was 0.
return 0
}
func (t *Task) Update(u service.TaskUpdater) error {
u.Job = t.ID[0]
u.Task = t.ID[1]
err := t.update(u)
if err != nil {
return err
}
if u.UpdateStatus {
t.setStatus(TaskStatus(u.Status))
}
if u.UpdateAssignee {
t.Assignee = u.Assignee
}
return nil
}
// setStatus sets the task and it's parents status.
// Don't use this directly and use Task.Update unless there is need for
// updating task status without db update.
func (t *Task) setStatus(s TaskStatus) {
if !t.isLeaf {
panic("setStatus applied to a leaf task only")
}
old := t.status
t.status = s
parent := t.parent
for parent != nil {
parent.Stat.Change(old, s)
parent = parent.parent
}
}