Skip to content

Commit fe3bb6f

Browse files
committed
Fixes & Enhancements -
1) Mutex implementation changed 2) Module name changed 3) More test cases added 4) Default arg in get function added 5) Comments updated
1 parent 34e4dce commit fe3bb6f

6 files changed

Lines changed: 240 additions & 114 deletions

File tree

cache.go

Lines changed: 70 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,26 @@ package txncache
33
import (
44
"context"
55
"fmt"
6-
"github.com/saurav534/txn-cache/internal"
6+
"github.com/gocollection/txn-cache/internal/pkg/lock"
77
"sync"
88
"time"
99
)
1010

1111
type txnCache struct {
12-
store *sync.Map
13-
lock *sync.Map
14-
fetch Fetch
15-
multiFetch MultiFetch
16-
defVal Value
17-
cacheDef bool
18-
args []interface{}
19-
batchSize int
20-
multiKeyChan chan Key
21-
execMultiFetch chan bool
22-
closeChan chan bool
23-
cleanup *sync.Once
24-
calls *sync.WaitGroup
25-
closed bool
12+
store *sync.Map // cache entries to be stored here
13+
lock *sync.Map // read-write lock per key
14+
fetch Fetch // fetch function
15+
multiFetch MultiFetch // multi fetch function or bulk fetch
16+
defVal Value // default value
17+
cacheDef bool // caching of default value
18+
args []interface{} // common args to call fetch functions
19+
batchSize int // batch size for per batch call to multiFetch function
20+
attemptKey chan Key // keys to be pushed on channel to make fetch attempt
21+
execMultiFetch chan bool // force cache to fetch values for current keys in line
22+
closeChan chan bool // close the cache process and clean the remaining
23+
cleanup *sync.Once // cleanup allow closeup only once
24+
calls *sync.WaitGroup // action number of cache calls in progress
25+
closed bool // if the cache has been closed
2626
}
2727

2828
// NewCache gives an instance of txnCache for given fetch & multiFetch functions
@@ -71,20 +71,24 @@ func NewCache(fetch Fetch, multiFetch MultiFetch, batchSize int) (Cache, error)
7171
lock: &sync.Map{},
7272
fetch: fetch,
7373
multiFetch: multiFetch,
74-
multiKeyChan: make(chan Key),
74+
attemptKey: make(chan Key),
7575
defVal: struct{}{},
7676
batchSize: batchSize,
7777
execMultiFetch: make(chan bool),
7878
closeChan: make(chan bool),
7979
cleanup: &sync.Once{},
8080
calls: &sync.WaitGroup{},
8181
}
82+
// starting cache setup
8283
cache.setup()
8384
return cache, nil
8485
}
8586

8687
func (tc *txnCache) setup() {
88+
// unique keys to make attempt for
8789
uniqueKeys := make([]Key, 0)
90+
91+
// execute function to fetch values for given keys
8892
execute := func(keys []Key) {
8993
mf := tc.multiFetch(keys, tc.args...)
9094
for _, k := range keys {
@@ -99,85 +103,80 @@ func (tc *txnCache) setup() {
99103
tc.store.Store(k.ID(), v)
100104
}
101105
if rw, ok := tc.lock.Load(k.ID()); ok {
102-
(rw.(*internal.RWMutex)).Unlock()
106+
(rw.(*lock.RWMutex)).Unlock()
103107
}
104108
}
105109
}
106-
setup := make(chan bool)
107110
go func() {
108111
for {
109112
select {
110-
case key := <-tc.multiKeyChan:
113+
case <-tc.execMultiFetch: // force fetch execution
114+
{
115+
go execute(uniqueKeys)
116+
uniqueKeys = nil
117+
}
118+
case key := <-tc.attemptKey: // a new key arrives to attempt for
111119
{
112120
uniqueKeys = append(uniqueKeys, key)
121+
// if the key count if equal to batch size, call execute
113122
if tc.batchSize > 0 && tc.batchSize == len(uniqueKeys) {
114123
go execute(uniqueKeys)
115124
uniqueKeys = nil
116125
}
117126
}
118-
case <-tc.execMultiFetch:
119-
{
120-
go execute(uniqueKeys)
121-
uniqueKeys = nil
122-
}
123-
case <-tc.closeChan:
127+
case <-tc.closeChan: // cache close signal
124128
{
125129
finalClose := make(chan bool)
130+
// handle calls in progress
126131
go func() {
127132
for {
128133
select {
129-
case key := <-tc.multiKeyChan:
130-
{
131-
uniqueKeys = append(uniqueKeys, key)
132-
}
133-
case <-tc.execMultiFetch:
134+
case <-tc.execMultiFetch: // post closeup no op on exec
134135
{
135136
for _, k := range uniqueKeys {
136137
if rw, ok := tc.lock.Load(k.ID()); ok {
137-
(rw.(*internal.RWMutex)).Unlock()
138+
(rw.(*lock.RWMutex)).Unlock()
138139
}
139140
}
140141
uniqueKeys = nil
141142
}
142-
case <-finalClose:
143+
case key := <-tc.attemptKey: // batching ignored post closeup
144+
{
145+
uniqueKeys = append(uniqueKeys, key)
146+
}
147+
case <-finalClose: // returns from for loop after all the calls are done
143148
{
144149
return
145150
}
146151
}
147152
}
148153
}()
154+
// close call waits for all the running get calls to get completed
155+
// all the new unique keys are digested and no op is performed on exec
149156
tc.calls.Wait()
150157
finalClose <- true
151158
return
152159
}
153-
case <-setup:
154-
{
155-
continue
156-
}
157160
}
158161
}
159162
}()
160-
setup <- true
161-
return
162163
}
163164

164165
func (tc *txnCache) Get(key Key) Value {
165166
tc.calls.Add(1)
166167
defer tc.calls.Done()
167-
if value, ok := tc.store.Load(key.ID()); ok {
168+
if value, ok := tc.store.Load(key.ID()); ok { // cache hit
168169
return value
169170
}
170-
keyRWMutex, _ := tc.lock.LoadOrStore(key.ID(), internal.NewRWMutex())
171-
rwMutex := keyRWMutex.(*internal.RWMutex)
172-
rwMutex.RLock()
173-
if value, ok := tc.store.Load(key.ID()); ok {
174-
defer rwMutex.RUnlock()
175-
return value
176-
} else {
177-
rwMutex.RUnlock()
178-
rwMutex.Lock()
171+
keyRWMutex, _ := tc.lock.LoadOrStore(key.ID(), lock.NewRWMutex())
172+
rwMutex := keyRWMutex.(*lock.RWMutex)
173+
locked := rwMutex.TryLock()
174+
if locked {
179175
defer rwMutex.Unlock()
180-
value := tc.fetch(key)
176+
if value, ok := tc.store.Load(key.ID()); ok { // cache hit after lock acquired
177+
return value
178+
}
179+
value := tc.fetch(key, tc.args...)
181180
if value == nil {
182181
if tc.cacheDef {
183182
value = tc.defVal
@@ -187,52 +186,62 @@ func (tc *txnCache) Get(key Key) Value {
187186
}
188187
tc.store.Store(key.ID(), value)
189188
return value
189+
} else {
190+
rwMutex.RLock()
191+
defer rwMutex.RUnlock()
192+
if value, ok := tc.store.Load(key.ID()); ok {
193+
return value
194+
}
195+
return tc.defVal
190196
}
191197
}
192198

193199
func (tc *txnCache) MultiGet(keys []Key) map[Key]Value {
194-
if tc.closed {
200+
if tc.closed { // if cache is already closed
195201
return map[Key]Value{}
196202
}
197203
tc.calls.Add(1)
198204
defer tc.calls.Done()
199205
var data sync.Map
200206
var wg sync.WaitGroup
201207
var keyPush sync.WaitGroup
208+
// key will either be a hit in the cache entry or
209+
// will first in queue to attempt for fetch or
210+
// will wait for an existing fetch attempt to get completed
202211
for _, key := range keys {
203-
if value, ok := tc.store.Load(key.ID()); ok {
212+
if value, ok := tc.store.Load(key.ID()); ok { // cache hit
204213
data.Store(key, value)
205214
continue
206215
}
207-
keyRWMutex, _ := tc.lock.LoadOrStore(key.ID(), internal.NewRWMutex())
208-
rwMutex := keyRWMutex.(*internal.RWMutex)
209-
locked := rwMutex.TryLock()
216+
keyRWMutex, _ := tc.lock.LoadOrStore(key.ID(), lock.NewRWMutex())
217+
rwMutex := keyRWMutex.(*lock.RWMutex)
218+
locked := rwMutex.TryLock() // non blocking, if already locked it will wait to read the value
210219
if locked {
211-
if value, ok := tc.store.Load(key.ID()); ok {
220+
if value, ok := tc.store.Load(key.ID()); ok { // cache hit after lock acquired
212221
data.Store(key, value)
213222
rwMutex.Unlock()
214223
continue
215224
}
216225
}
217226
wg.Add(1)
218227
keyPush.Add(1)
219-
go func(k Key, kwg *internal.RWMutex, locked bool) {
228+
go func(k Key, kwg *lock.RWMutex, locked bool) {
220229
defer wg.Done()
221230
if locked {
222-
tc.multiKeyChan <- k
231+
tc.attemptKey <- k // adding key for fetch call
223232
}
224233
keyPush.Done()
225-
kwg.RLock()
234+
kwg.RLock() // waiting to read the set value
226235
defer kwg.RUnlock()
227236
if value, ok := tc.store.Load(k.ID()); ok {
228237
data.Store(k, value)
229238
} else {
230-
data.Store(k, tc.defVal)
239+
data.Store(k, tc.defVal) // default value is returned if no value found in cache
231240
}
232241
}(key, rwMutex, locked)
233242
}
234-
keyPush.Wait()
235-
tc.execMultiFetch <- true
243+
keyPush.Wait() // waiting for all the miss keys for their appropriate efforts
244+
tc.execMultiFetch <- true // forcing cache to execute fetch to unblock this call asap
236245
res := make(map[Key]Value)
237246
wg.Wait()
238247
data.Range(func(key, value interface{}) bool {

0 commit comments

Comments
 (0)