-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfuture_executor.go
More file actions
89 lines (79 loc) · 2.37 KB
/
future_executor.go
File metadata and controls
89 lines (79 loc) · 2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package grpool
import "time"
// Future 任务执行器, 可返回执行结果.
// 非协程安全.
type FutureExecutor struct {
p *Pool
tasks []*FutureTask
}
func NewFutureExecutor(p *Pool) *FutureExecutor {
return &FutureExecutor{
p: p,
tasks: make([]*FutureTask, 0),
}
}
// 添加任务
func (e *FutureExecutor) AddTask(f FutureFunc) {
task := NewFutureTask(f)
e.tasks = append(e.tasks, task)
ok := e.p.AddTask(task)
if !ok {
task.Abort("pool is full")
}
}
// 获取结果.
// @return
// []interface{} 为 FutureFunc 返回的 interface{}, 顺序和 AddTask 顺序完全一致
// []error 包括 Task 执行的错误(FutureTaskError)以及 FutureFunc 返回的错误(FutureFuncError), 顺序和 AddTask 顺序完全一致
func (e *FutureExecutor) Wait() ([]interface{}, []error) {
taskNum := len(e.tasks)
resultList := make([]interface{}, taskNum)
errorList := make([]error, taskNum)
for index, task := range e.tasks {
result, err := task.GetResult()
if err != nil {
errorList[index] = err
} else {
resultList[index] = result
}
}
return resultList, errorList
}
// 获取结果.
// 超时后, 未开始执行的任务将会被取消, 未完成执行的任务结果将被丢弃. 已经执行完成的任务结果正常返回
// @return
// []interface{} 为 FutureFunc 返回的 interface{}, 顺序和 AddTask 顺序完全一致
// []error 包括 Task 执行的错误(FutureTaskError)以及 FutureFunc 返回的错误(FutureFuncError), 顺序和 AddTask 顺序完全一致
func (e *FutureExecutor) WaitWithTimeout(d time.Duration) ([]interface{}, []error) {
taskNum := len(e.tasks)
resultList := make([]interface{}, taskNum)
errorList := make([]error, taskNum)
timer := time.NewTimer(d)
hasTimeout := false
for index, task := range e.tasks {
if !hasTimeout {
result, timeout, err := task.GetResultWithTimeout(timer)
if err != nil {
errorList[index] = err
} else {
resultList[index] = result
}
if timeout {
hasTimeout = true
// 发生超时, 终止剩余的、未执行的任务
taskNum := len(e.tasks)
for i := index + 1; i < taskNum; i++ {
e.tasks[i].Abort("time out abort")
}
}
} else { // 超时后, 无等待结果获取
result, err := task.GetResultNoWait()
if err != nil {
errorList[index] = err
} else {
resultList[index] = result
}
}
}
return resultList, errorList
}