-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpool.go
More file actions
69 lines (59 loc) · 1.01 KB
/
pool.go
File metadata and controls
69 lines (59 loc) · 1.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package pool
// 协程池
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
// 带缓存的协程池,缓存会被垃圾回收
type Pool struct {
size int // 协程数
count atomic.Uint32 // 任务总数
pc chan Task
cache *poolCache // 任务缓存
mux sync.Mutex
closed bool
wg sync.WaitGroup
startOnce sync.Once
closeOnce sync.Once
}
func NewPool(gSize int) *Pool {
return &Pool{
size: gSize,
pc: make(chan Task, gSize),
}
}
func (p *Pool) Start() {
// task execute
p.startOnce.Do(func() {
for i := 0; i < p.size; i++ {
go func() {
for task := range p.pc {
task.Execute()
p.count.Add(1)
p.wg.Done()
}
}()
}
})
}
func (p *Pool) AddTask(t Task) error {
if p.closed {
return fmt.Errorf("Pool is closed")
}
p.wg.Add(1)
addCache(p.pc, t, p.cache, unsafe.Pointer(&p.mux))
return nil
}
func (p *Pool) Wait() error {
p.wg.Wait()
return nil
}
func (p *Pool) Close() {
p.closeOnce.Do(
func() {
p.closed = true
close(p.pc)
})
}