-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathactor.go
More file actions
144 lines (114 loc) · 2.46 KB
/
actor.go
File metadata and controls
144 lines (114 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package simple_actor
import (
"context"
"fmt"
"io"
"sync"
"time"
)
// Event type of a event
type Event int
// Arg an argument of an event
type Arg interface{}
type eventEntity struct {
event Event
args []Arg
}
// EventHandler handler of a type of event
type EventHandler func(args ...Arg)
// Actor interface of an actor
type Actor interface {
io.Closer
// Register register an event and its handler
Register(event Event, handler EventHandler) error
// Cast casts an event and its arguments
Cast(event Event, args ...Arg) error
}
type actor struct {
sync.RWMutex
closeOnce sync.Once
eventMap map[Event]EventHandler
eventCh chan eventEntity
quit chan struct{}
done chan struct{}
}
// New creates an actor
func New() Actor {
a := &actor{
eventMap: map[Event]EventHandler{},
eventCh: make(chan eventEntity),
quit: make(chan struct{}),
done: make(chan struct{}),
}
go a.process()
return a
}
// Register register an event and its handler
func (a *actor) Register(event Event, handler EventHandler) error {
a.Lock()
defer a.Unlock()
if handler == nil {
return fmt.Errorf("handler is required")
}
if _, ok := a.eventMap[event]; ok {
return fmt.Errorf("eventEntity %v has already registered", event)
}
a.eventMap[event] = handler
return nil
}
// Cast casts an event along with its argument to the actor
func (a *actor) Cast(event Event, args ...Arg) error {
a.RLock()
defer a.RUnlock()
if _, ok := a.eventMap[event]; !ok {
return fmt.Errorf("eventEntity %v hasn't registered yet", event)
}
a.eventCh <- eventEntity{event: event, args: args}
return nil
}
// Close closes the actor
func (a *actor) Close() error {
a.closeOnce.Do(func() {
close(a.quit)
<-a.done
})
return nil
}
// process events one by one
func (a *actor) process() {
defer close(a.done)
for {
select {
case <-a.quit:
return
case evt := <-a.eventCh:
h := a.eventMap[evt.event]
h(evt.args...)
}
}
}
// Wait for the event channel drain. Just for testing purpose
func (a *actor) waitForEmptyChan(timeout time.Duration) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan struct{})
go func() {
for {
if ctx.Err() != nil {
return
}
if len(a.eventCh) == 0 {
close(done)
return
}
time.Sleep(time.Microsecond)
}
}()
select {
case <-time.After(timeout):
cancel()
return fmt.Errorf("event channel did not drain within %v", timeout)
case <-done:
return nil
}
}