-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathko.go
More file actions
287 lines (238 loc) · 6.31 KB
/
ko.go
File metadata and controls
287 lines (238 loc) · 6.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package ko
import (
"errors"
"fmt"
"os"
"sort"
"strings"
"sync"
"github.com/benitogf/ooo/key"
"github.com/benitogf/ooo/meta"
"github.com/benitogf/ooo/storage"
"github.com/syndtr/goleveldb/leveldb"
errorsLeveldb "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
// ErrPathNotAccessible is returned when the database path cannot be accessed
var ErrPathNotAccessible = errors.New("ko: database path is not accessible")
// EmbeddedStorage implements storage.EmbeddedLayer using LevelDB
// This is a pure persistence layer without in-memory caching
type EmbeddedStorage struct {
Path string
client *leveldb.DB
mutex sync.RWMutex
active bool
}
// NewEmbeddedStorage creates a new embedded storage
func NewEmbeddedStorage(path string) *EmbeddedStorage {
return &EmbeddedStorage{
Path: path,
}
}
// Active returns whether the storage is active
func (e *EmbeddedStorage) Active() bool {
e.mutex.RLock()
defer e.mutex.RUnlock()
return e.active
}
func (e *EmbeddedStorage) recover() error {
var err error
e.client, err = leveldb.RecoverFile(e.Path, &opt.Options{})
return err
}
// Start initializes the embedded storage
func (e *EmbeddedStorage) Start(layerOpt storage.LayerOptions) error {
var err error
e.mutex.Lock()
defer e.mutex.Unlock()
if e.Path == "" {
e.Path = "data/db"
}
// Check if path is accessible before attempting to open
if err := checkPathAccessible(e.Path); err != nil {
return err
}
e.client, err = leveldb.OpenFile(e.Path, &opt.Options{})
if errorsLeveldb.IsCorrupted(err) {
err = e.recover()
if err != nil {
return fmt.Errorf("ko: failed to recover corrupted database at %q: %w", e.Path, err)
}
}
if err != nil {
return fmt.Errorf("ko: failed to open database at %q: %w", e.Path, err)
}
e.active = true
return nil
}
// checkPathAccessible verifies the database path can be accessed or created
func checkPathAccessible(path string) error {
// Check if path exists
info, err := os.Stat(path)
if err == nil {
// Path exists, check if it's a directory (LevelDB uses directories)
if !info.IsDir() {
return fmt.Errorf("%w: %q exists but is not a directory", ErrPathNotAccessible, path)
}
// Check write permission by trying to create a temp file
testFile := path + "/.ko_write_test"
f, err := os.Create(testFile)
if err != nil {
return fmt.Errorf("%w: %q exists but is not writable: %v", ErrPathNotAccessible, path, err)
}
f.Close()
os.Remove(testFile)
return nil
}
if !os.IsNotExist(err) {
// Some other error (permission denied, etc.)
return fmt.Errorf("%w: cannot access %q: %v", ErrPathNotAccessible, path, err)
}
// Path doesn't exist, try to create it
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("%w: cannot create directory %q: %v", ErrPathNotAccessible, path, err)
}
return nil
}
// Close shuts down the embedded storage
func (e *EmbeddedStorage) Close() {
e.mutex.Lock()
defer e.mutex.Unlock()
e.active = false
if e.client != nil {
e.client.Close()
}
}
// Get retrieves a single value by exact key
func (e *EmbeddedStorage) Get(k string) (meta.Object, error) {
data, err := e.client.Get([]byte(k), nil)
if err != nil {
if err.Error() == "leveldb: not found" {
return meta.Object{}, storage.ErrNotFound
}
return meta.Object{}, err
}
obj, err := meta.Decode(data)
if err != nil {
return meta.Object{}, err
}
obj.Path = k
return obj, nil
}
// GetList retrieves all values matching a glob pattern
func (e *EmbeddedStorage) GetList(path string) ([]meta.Object, error) {
if !key.HasGlob(path) {
return nil, storage.ErrInvalidPattern
}
res := []meta.Object{}
// Use prefix scan for efficiency
globPrefixKey := strings.Split(path, "*")[0]
rangeKey := util.BytesPrefix([]byte(globPrefixKey))
if globPrefixKey == "" || globPrefixKey == "*" {
rangeKey = nil
}
iter := e.client.NewIterator(rangeKey, &opt.ReadOptions{
DontFillCache: true,
})
defer iter.Release()
for iter.Next() {
k := string(iter.Key())
if !key.Match(path, k) {
continue
}
obj, err := meta.Decode(iter.Value())
if err != nil {
continue
}
obj.Path = k
res = append(res, obj)
}
if err := iter.Error(); err != nil {
return nil, err
}
return res, nil
}
// Set stores a value
func (e *EmbeddedStorage) Set(k string, obj *meta.Object) error {
encoded := meta.New(obj)
return e.client.Put([]byte(k), encoded, nil)
}
// Del deletes a key
func (e *EmbeddedStorage) Del(k string) error {
if !key.HasGlob(k) {
_, err := e.client.Get([]byte(k), nil)
if err != nil {
if err.Error() == "leveldb: not found" {
return storage.ErrNotFound
}
return err
}
return e.client.Delete([]byte(k), nil)
}
// Glob delete
globPrefixKey := strings.Split(k, "*")[0]
rangeKey := util.BytesPrefix([]byte(globPrefixKey))
if globPrefixKey == "" || globPrefixKey == "*" {
rangeKey = nil
}
iter := e.client.NewIterator(rangeKey, nil)
defer iter.Release()
for iter.Next() {
if key.Match(k, string(iter.Key())) {
if err := e.client.Delete(iter.Key(), nil); err != nil {
return err
}
}
}
return iter.Error()
}
// Keys returns all keys
func (e *EmbeddedStorage) Keys() ([]string, error) {
keys := []string{}
iter := e.client.NewIterator(nil, &opt.ReadOptions{
DontFillCache: true,
})
defer iter.Release()
for iter.Next() {
keys = append(keys, string(iter.Key()))
}
if err := iter.Error(); err != nil {
return nil, err
}
sort.Slice(keys, func(i, j int) bool {
return strings.ToLower(keys[i]) < strings.ToLower(keys[j])
})
return keys, nil
}
// Clear removes all data
func (e *EmbeddedStorage) Clear() {
iter := e.client.NewIterator(nil, nil)
defer iter.Release()
for iter.Next() {
_ = e.client.Delete(iter.Key(), nil)
}
}
// Load reads all data from persistent storage
func (e *EmbeddedStorage) Load() (map[string]*meta.Object, error) {
data := make(map[string]*meta.Object)
iter := e.client.NewIterator(nil, &opt.ReadOptions{
DontFillCache: true,
})
defer iter.Release()
for iter.Next() {
k := string(iter.Key())
obj, err := meta.Decode(iter.Value())
if err != nil {
continue
}
obj.Path = k
data[k] = &obj
}
if err := iter.Error(); err != nil {
return nil, err
}
return data, nil
}
// Verify EmbeddedStorage implements storage.EmbeddedLayer
var _ storage.EmbeddedLayer = (*EmbeddedStorage)(nil)