Skip to content

Commit e3deebf

Browse files
committed
Enhancements -
1) handling potential deadlock if MultiFetch function do not give response for all the keys. 2) Functionality to take cache dump. 3) Preload cache with boot up key id - value pairs. 4) Cache explicit clean up and clean up with context.
1 parent 4983ad8 commit e3deebf

4 files changed

Lines changed: 182 additions & 45 deletions

File tree

README.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ the keys are overlapping as well.
4040

4141
```go
4242
func main() {
43-
cache, _ := txncache.NewCache(context.Background(), GetValue, GetMultiValue, 10)
43+
cache, _ := txncache.NewCache(GetValue, GetMultiValue, 10)
4444
var wg sync.WaitGroup
4545

4646
wg.Add(1)
@@ -91,4 +91,18 @@ func (k ZKey) Id() string {
9191
}
9292
```
9393

94-
For robust example checkout the test file.
94+
For robust example checkout the test file.
95+
96+
## Dump & Pre Loading
97+
One might need to reuse the current cache store in the next transaction as well. So **GetAll()** can be used to take
98+
the dump of current cache storage. Client can store it somewhere and case use it later to preload.
99+
**Preload(map[string]Value)** preload a cache with given key id and value map and do not call fetch function for those keys
100+
101+
## Default Value
102+
Ideally the *Fetch* & *MultiFetch* function should return value for all the provided keys. However if the implementation
103+
does not guarantee that, one should provide a default value to be used as return value of such keys. This is compulsory to
104+
avoid deadlock situation while waiting for these keys.
105+
106+
## Cache cleanup
107+
Cache should be closed post usage to avoid leaks, Close up can be done explicitly by calling **Close()** function or
108+
can be closed along with ctx cancellation by using **CloseWithCtx(context.Context)** function.

cache.go

Lines changed: 89 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ import (
77
"sync"
88
)
99

10-
type TxnCache struct {
10+
type txnCache struct {
1111
store *sync.Map
1212
lock *sync.Map
1313
fetch Fetch
1414
multiFetch MultiFetch
15+
defVal Value
16+
batchSize int
1517
multiKeyChan chan *keyValChan
1618
execMultiFetch chan bool
19+
closeChan chan bool
1720
}
1821

1922
type keyWrap struct {
@@ -26,11 +29,11 @@ type keyValChan struct {
2629
locked bool
2730
}
2831

29-
// NewCache gives an instance of TxnCache for given fetch & multiFetch functions
32+
// NewCache gives an instance of txnCache for given fetch & multiFetch functions
3033
// Fetch is the function to be used for value fetch of a single key
3134
// MultiFetch is the batched version of fetch function
3235
// batchSize should be passed greater than 0 if MultiFetch function is expected to be called in batch
33-
func NewCache(ctx context.Context, fetch Fetch, multiFetch MultiFetch, batchSize int) (*TxnCache, error) {
36+
func NewCache(fetch Fetch, multiFetch MultiFetch, batchSize int) (Cache, error) {
3437
if fetch == nil && multiFetch == nil {
3538
return nil, fmt.Errorf("both fetch and multifetch can not be nil")
3639
}
@@ -66,22 +69,49 @@ func NewCache(ctx context.Context, fetch Fetch, multiFetch MultiFetch, batchSize
6669

6770
multiKeyChan := make(chan *keyValChan)
6871
execMultiFetch := make(chan bool)
72+
closeChan := make(chan bool)
6973

7074
store := &sync.Map{}
7175
lock := &sync.Map{}
7276

77+
cache := &txnCache{
78+
store: store,
79+
lock: lock,
80+
fetch: fetch,
81+
multiFetch: multiFetch,
82+
multiKeyChan: multiKeyChan,
83+
defVal: struct{}{},
84+
batchSize: batchSize,
85+
execMultiFetch: execMultiFetch,
86+
closeChan: closeChan,
87+
}
88+
cache.setup()
89+
return cache, nil
90+
}
91+
92+
func (tc *txnCache) setup() {
7393
uniqueKeys := make([]Key, 0)
7494
keysMap := make(map[string][]*keyWrap)
7595
resMap := make(map[*keyWrap]chan Value)
7696
execute := func() {
77-
for k, v := range multiFetch(uniqueKeys) {
78-
store.Store(k.Id(), v)
79-
if kws, ok := keysMap[k.Id()]; ok {
97+
mf := tc.multiFetch(uniqueKeys)
98+
for _, k := range uniqueKeys {
99+
v := mf[k]
100+
// if fetch function do not return any value for a key
101+
// ideally fetch function should handle this and return a
102+
// suitable value of such keys
103+
// default values are not cached
104+
if v == nil {
105+
v = tc.defVal
106+
} else {
107+
tc.store.Store(k.ID(), v)
108+
}
109+
if kws, ok := keysMap[k.ID()]; ok {
80110
for _, kw := range kws {
81111
resMap[kw] <- v
82112
delete(resMap, kw)
83113
}
84-
delete(keysMap, k.Id())
114+
delete(keysMap, k.ID())
85115
}
86116
}
87117
uniqueKeys = nil
@@ -90,80 +120,75 @@ func NewCache(ctx context.Context, fetch Fetch, multiFetch MultiFetch, batchSize
90120
go func() {
91121
for {
92122
select {
93-
case kr := <-multiKeyChan:
123+
case kr := <-tc.multiKeyChan:
94124
{
95125
mutex.Lock()
96-
if value, ok := store.Load(kr.key.Id()); ok {
126+
if value, ok := tc.store.Load(kr.key.ID()); ok {
97127
kr.val <- value
98128
mutex.Unlock()
99129
continue
100130
}
101131
kw := &keyWrap{kr.key}
102-
if keys, ok := keysMap[kr.key.Id()]; ok {
103-
keysMap[kr.key.Id()] = append(keys, kw)
132+
if keys, ok := keysMap[kr.key.ID()]; ok {
133+
keysMap[kr.key.ID()] = append(keys, kw)
104134
} else {
105135
uniqueKeys = append(uniqueKeys, kr.key)
106-
keysMap[kr.key.Id()] = []*keyWrap{kw}
136+
keysMap[kr.key.ID()] = []*keyWrap{kw}
107137
}
108138
resMap[kw] = kr.val
109-
if batchSize > 0 && batchSize == len(uniqueKeys) {
139+
if tc.batchSize > 0 && tc.batchSize == len(uniqueKeys) {
110140
execute()
111141
}
112142
mutex.Unlock()
113143
}
114-
case <-execMultiFetch:
144+
case <-tc.execMultiFetch:
115145
{
116146
mutex.Lock()
117147
execute()
118148
mutex.Unlock()
119149
}
120-
case <-ctx.Done():
150+
case <-tc.closeChan:
121151
{
122-
close(multiKeyChan)
123-
close(execMultiFetch)
152+
close(tc.multiKeyChan)
153+
close(tc.execMultiFetch)
124154
return
125155
}
126156
}
127157
}
128158
}()
129-
return &TxnCache{
130-
store: store,
131-
lock: lock,
132-
fetch: fetch,
133-
multiFetch: multiFetch,
134-
multiKeyChan: multiKeyChan,
135-
execMultiFetch: execMultiFetch,
136-
}, nil
137159
}
138160

139-
func (tc *TxnCache) Get(key Key) Value {
140-
if value, ok := tc.store.Load(key.Id()); ok {
161+
func (tc *txnCache) Get(key Key) Value {
162+
if value, ok := tc.store.Load(key.ID()); ok {
141163
return value
142164
}
143-
lock, _ := tc.lock.LoadOrStore(key.Id(), &sync.Mutex{})
165+
lock, _ := tc.lock.LoadOrStore(key.ID(), &sync.Mutex{})
144166
mutex := lock.(*sync.Mutex)
145167
mutex.Lock()
146168
defer mutex.Unlock()
147-
if value, ok := tc.store.Load(key.Id()); ok {
169+
if value, ok := tc.store.Load(key.ID()); ok {
148170
return value
149171
} else {
150172
value := tc.fetch(key)
151-
tc.store.Store(key.Id(), value)
173+
if value == nil {
174+
return tc.defVal
175+
}
176+
tc.store.Store(key.ID(), value)
152177
return value
153178
}
154179
}
155180

156-
func (tc *TxnCache) MultiGet(keys []Key) map[Key]Value {
181+
func (tc *txnCache) MultiGet(keys []Key) map[Key]Value {
157182
var data sync.Map
158183
var wg sync.WaitGroup
159184
var keyPush sync.WaitGroup
160185
for _, key := range keys {
161-
if value, ok := tc.store.Load(key.Id()); ok {
186+
if value, ok := tc.store.Load(key.ID()); ok {
162187
data.Store(key, value)
163188
continue
164189
}
165190
wg.Add(1)
166-
lock, _ := tc.lock.LoadOrStore(key.Id(), internal.NewMutex())
191+
lock, _ := tc.lock.LoadOrStore(key.ID(), internal.NewMutex())
167192
mutex := lock.(*internal.Mutex)
168193
locked := mutex.TryLock()
169194
keyPush.Add(1)
@@ -185,8 +210,38 @@ func (tc *TxnCache) MultiGet(keys []Key) map[Key]Value {
185210
res := make(map[Key]Value)
186211
wg.Wait()
187212
data.Range(func(key, value interface{}) bool {
188-
res[key.(Key)] = value.(Value)
213+
res[key.(Key)] = value
189214
return true
190215
})
191216
return res
192217
}
218+
219+
func (tc *txnCache) GetAll() map[string]Value {
220+
res := make(map[string]Value)
221+
tc.store.Range(func(key, value interface{}) bool {
222+
res[key.(string)] = value
223+
return true
224+
})
225+
return res
226+
}
227+
228+
func (tc *txnCache) Preload(preload map[string]Value) {
229+
for k, v := range preload {
230+
tc.store.Store(k, v)
231+
}
232+
}
233+
234+
func (tc *txnCache) DefaultValue(value Value) {
235+
tc.defVal = value
236+
}
237+
238+
func (tc *txnCache) CloseWithCtx(ctx context.Context) {
239+
go func() {
240+
<-ctx.Done()
241+
tc.closeChan <- true
242+
}()
243+
}
244+
245+
func (tc *txnCache) Close() {
246+
tc.closeChan <- true
247+
}

cache_api.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,42 @@
11
package txncache
22

3+
import "context"
4+
5+
// Key construct of the cache
6+
// ID() is used to uniquely identify Keys
37
type Key interface {
4-
Id() string
8+
ID() string
59
}
610

11+
// Value construct of the cache
12+
// Can be of any type
713
type Value interface{}
814

915
type Cache interface {
10-
Get(key Key) Value
11-
MultiGet(keys []Key) map[Key]Value
16+
// Get return value for a given key
17+
Get(Key) Value
18+
19+
// MultiGet returns key value map for a given list of keys
20+
MultiGet([]Key) map[Key]Value
21+
22+
// GetAll returns map of all the key id, value present in the cache
23+
GetAll() map[string]Value
24+
25+
// Preload can be used to load cache with a map of key value upfront
26+
Preload(map[string]Value)
27+
28+
// DefaultValue set the default value for the missing values while calling fetch functions
29+
// This is mandatory if the provided MultiFetch function does not guarantee value for all
30+
// the provided keys, default value is not cached.
31+
DefaultValue(Value)
32+
33+
// CloseWithCtx clean up the cache once the provided context is canceled or done
34+
// it's a non blocking function and can be used in place of explicit Close() call for guaranteed
35+
// cleanup as soon as ctx is canceled
36+
CloseWithCtx(context.Context)
37+
38+
// Close is necessary to call to cleanup cache and make it eligible for GC
39+
Close()
1240
}
1341

1442
type Fetch func(Key) Value

0 commit comments

Comments
 (0)