-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfuture_task.go
More file actions
139 lines (122 loc) · 3.36 KB
/
future_task.go
File metadata and controls
139 lines (122 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package grpool
import (
"fmt"
"time"
"sync/atomic"
)
// FutureFunc, error != nil 时将会丢弃 interface{}
type FutureFunc func() (interface{}, error)
type FutureTask struct {
f FutureFunc
resultChan chan interface{}
aborted uint32
}
func NewFutureTask(f FutureFunc) *FutureTask {
return &FutureTask{
f: f,
resultChan: make(chan interface{}, 1),
aborted: 0,
}
}
func (t *FutureTask) Run() {
defer func() {
if err := recover(); err != nil {
t.Abort(fmt.Sprintf("task.Run panic: %v", err))
}
}()
if t.IsAborted() {
return
}
result, err := t.f()
if err != nil {
t.tReturn(NewFutureFuncError(err))
} else {
t.tReturn(result)
}
}
func (t *FutureTask) PoolCloseCallBack() {
t.Abort("pool closed")
}
// 终止任务. 首次调用有效.
// 未执行的任务将不会执行. 正在执行的任务将会丢弃结果. 已经执行完成的任务不受影响.
// 返回 tReturn FutureTaskError, 已经执行的结果不受影响
func (t *FutureTask) Abort(msg string) {
if t.IsAborted() { // 加这个性能稍有提升
return
}
if !atomic.CompareAndSwapUint32(&t.aborted, 0, 1) {
return
}
t.tReturn(NewFutureTaskError(msg))
}
// 是否终止
func (t *FutureTask) IsAborted() bool {
return atomic.LoadUint32(&t.aborted) == 1
}
// 获取结果, 会阻塞.
//
// @return
// interface{} 为 FutureFunc 返回的 interface{},
// error 为 Task 执行的错误(FutureTaskError) 或者 FutureFunc 返回的错误(FutureFuncError).
func (t *FutureTask) GetResult() (interface{}, error) {
select {
case result, ok := <-t.resultChan:
if !ok {
return nil, NewFutureTaskError("result chan closed")
}
return t.pickResult(result)
}
}
// 获取结果, 可以设置超时. 参数 timer 可以用于限制一组 task 的超时时间
//
// @return
// interface{} 为 FutureFunc 返回的 interface{}.
// bool 是否超时, true-超时, false-未超时.
// error 为 Task 执行的错误(FutureTaskError) 或者 FutureFunc 返回的错误(FutureFuncError).
func (t *FutureTask) GetResultWithTimeout(timer *time.Timer) (interface{}, bool, error) {
select {
case <-timer.C:
return nil, true, NewFutureTaskError("time out here")
case result, ok := <-t.resultChan:
if !ok {
return nil, false, NewFutureTaskError("result chan closed")
}
result, err := t.pickResult(result)
return result, false, err
}
}
// 获取结果, 非阻塞. 立刻返回, 无论是否得到结果
//
// @return
// interface{} 为 FutureFunc 返回的 interface{}.
// error 为 Task 执行的错误(FutureTaskError) 或者 FutureFunc 返回的错误(FutureFuncError).
func (t *FutureTask) GetResultNoWait() (interface{}, error) {
select {
case result, ok := <-t.resultChan:
if !ok {
return nil, NewFutureTaskError("result chan closed")
}
return t.pickResult(result)
default:
return nil, NewFutureTaskError("no result yet")
}
}
// 返回结果. 成功返回 true, 失败返回 false. 如果已经有结果了会 false
func (t *FutureTask) tReturn(result interface{}) bool {
select {
case t.resultChan <- result:
return true
default:
return false
}
}
// 分离 resultChan 中的正常结果和 error
func (t *FutureTask) pickResult(result interface{}) (interface{}, error) {
if err, ok := result.(*FutureTaskError); ok {
return nil, err
}
if err, ok := result.(*FutureFuncError); ok {
return nil, err
}
return result, nil
}