diff --git a/compaction.go b/compaction.go index 5823ef3..fac77b2 100644 --- a/compaction.go +++ b/compaction.go @@ -4,12 +4,13 @@ import ( "sync/atomic" "github.com/akrylysov/pogreb/internal/errors" + "github.com/akrylysov/pogreb/internal/hash" ) // promoteRecord writes the record to the current segment if the index still points to the record. // Otherwise it discards the record. -func (db *DB) promoteRecord(rec record) (bool, error) { - hash := db.hash(rec.key) +func (db *DB[K, V]) promoteRecord(rec record) (bool, error) { + hash := hash.Sum32WithSeed(rec.key, db.hashSeed) it := db.index.newBucketIterator(db.index.bucketIndex(hash)) for { b, err := it.next() @@ -55,7 +56,7 @@ type CompactionResult struct { ReclaimedBytes int } -func (db *DB) compact(sourceSeg *segment) (CompactionResult, error) { +func (db *DB[K, V]) compact(sourceSeg *segment) (CompactionResult, error) { cr := CompactionResult{} db.mu.Lock() @@ -102,7 +103,7 @@ func (db *DB) compact(sourceSeg *segment) (CompactionResult, error) { } // pickForCompaction returns segments eligible for compaction. -func (db *DB) pickForCompaction() []*segment { +func (db *DB[K, V]) pickForCompaction() []*segment { segments := db.datalog.segmentsBySequenceID() var picked []*segment for i := len(segments) - 1; i >= 0; i-- { @@ -131,7 +132,7 @@ func (db *DB) pickForCompaction() []*segment { // Compact compacts the DB. Deleted and overwritten items are discarded. // Returns an error if compaction is already in progress. -func (db *DB) Compact() (CompactionResult, error) { +func (db *DB[K, V]) Compact() (CompactionResult, error) { cr := CompactionResult{} // Run only a single compaction at a time. diff --git a/compaction_test.go b/compaction_test.go index 3a91d48..117bd20 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -16,7 +16,7 @@ func fileExists(name string) bool { return !os.IsNotExist(err) } -func countSegments(t *testing.T, db *DB) int { +func countSegments[K, V String](t *testing.T, db *DB[K, V]) int { t.Helper() db.mu.RLock() defer db.mu.RUnlock() @@ -31,7 +31,7 @@ func countSegments(t *testing.T, db *DB) int { } func TestCompaction(t *testing.T) { - run := func(name string, f func(t *testing.T, db *DB)) bool { + run := func(name string, f func(t *testing.T, db *DB[[]byte, []byte])) bool { opts := &Options{ maxSegmentSize: 1024, compactionMinSegmentSize: 520, @@ -45,7 +45,7 @@ func TestCompaction(t *testing.T) { }) } - run("empty", func(t *testing.T, db *DB) { + run("empty", func(t *testing.T, db *DB[[]byte, []byte]) { assert.Equal(t, 1, countSegments(t, db)) cr, err := db.Compact() assert.Nil(t, err) @@ -56,7 +56,7 @@ func TestCompaction(t *testing.T) { // A single segment file can fit 42 items (12 bytes per item, 1 byte key, 1 byte value). const maxItemsPerFile byte = 42 - run("compact only segment", func(t *testing.T, db *DB) { + run("compact only segment", func(t *testing.T, db *DB[[]byte, []byte]) { // Write items and then overwrite them on the second iteration. for j := 0; j < 10; j++ { assert.Nil(t, db.Put([]byte{0}, []byte{0})) @@ -74,7 +74,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, false, fileExists(filepath.Join(testDBName, segmentMetaName(0, 1)))) }) - run("compact entire segment", func(t *testing.T, db *DB) { + run("compact entire segment", func(t *testing.T, db *DB[[]byte, []byte]) { // Write items and then overwrite them on the second iteration. for i := 0; i < 2; i++ { for j := byte(0); j < maxItemsPerFile; j++ { @@ -92,7 +92,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, &segmentMeta{PutRecords: 42}, db.datalog.segments[1].meta) }) - run("compact part of segment", func(t *testing.T, db *DB) { + run("compact part of segment", func(t *testing.T, db *DB[[]byte, []byte]) { for j := byte(0); j < maxItemsPerFile; j++ { assert.Nil(t, db.Put([]byte{j}, []byte{j})) } @@ -110,7 +110,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, &segmentMeta{PutRecords: 42}, db.datalog.segments[1].meta) }) - run("compact multiple segments", func(t *testing.T, db *DB) { + run("compact multiple segments", func(t *testing.T, db *DB[[]byte, []byte]) { for i := 0; i < 4; i++ { for j := byte(0); j < maxItemsPerFile; j++ { assert.Nil(t, db.Put([]byte{j}, []byte{j})) @@ -123,7 +123,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, 1, countSegments(t, db)) }) - run("zero deleted bytes", func(t *testing.T, db *DB) { + run("zero deleted bytes", func(t *testing.T, db *DB[[]byte, []byte]) { for i := byte(0); i < maxItemsPerFile; i++ { assert.Nil(t, db.Put([]byte{i}, []byte{i})) } @@ -136,7 +136,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, &segmentMeta{PutRecords: 42}, db.datalog.segments[0].meta) }) - run("below threshold", func(t *testing.T, db *DB) { + run("below threshold", func(t *testing.T, db *DB[[]byte, []byte]) { for j := byte(0); j < maxItemsPerFile; j++ { assert.Nil(t, db.Put([]byte{j}, []byte{j})) } @@ -150,7 +150,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, 2, countSegments(t, db)) }) - run("above threshold", func(t *testing.T, db *DB) { + run("above threshold", func(t *testing.T, db *DB[[]byte, []byte]) { for j := byte(0); j < maxItemsPerFile; j++ { assert.Nil(t, db.Put([]byte{j}, []byte{j})) } @@ -165,7 +165,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, 1, countSegments(t, db)) }) - run("compact single segment in the middle: puts", func(t *testing.T, db *DB) { + run("compact single segment in the middle: puts", func(t *testing.T, db *DB[[]byte, []byte]) { // Write two segments. for j := byte(0); j < maxItemsPerFile*2; j++ { assert.Nil(t, db.Put([]byte{j}, []byte{j})) @@ -181,7 +181,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, 2, countSegments(t, db)) }) - run("compact single segment in the middle: deletes", func(t *testing.T, db *DB) { + run("compact single segment in the middle: deletes", func(t *testing.T, db *DB[[]byte, []byte]) { for j := byte(0); j < (maxItemsPerFile*2)-1; j++ { assert.Nil(t, db.Put([]byte{j}, []byte{j})) } @@ -200,7 +200,7 @@ func TestCompaction(t *testing.T) { assert.Equal(t, 2, countSegments(t, db)) }) - run("delete and compact all segments", func(t *testing.T, db *DB) { + run("delete and compact all segments", func(t *testing.T, db *DB[[]byte, []byte]) { // Write items. for i := byte(0); i < maxItemsPerFile; i++ { assert.Nil(t, db.Put([]byte{i}, []byte{i})) @@ -220,7 +220,7 @@ func TestCompaction(t *testing.T) { assert.Nil(t, db.datalog.segments[1]) }) - run("busy error", func(t *testing.T, db *DB) { + run("busy error", func(t *testing.T, db *DB[[]byte, []byte]) { wg := sync.WaitGroup{} wg.Add(1) db.mu.Lock() diff --git a/datalog.go b/datalog.go index 0719f07..3ef3f33 100644 --- a/datalog.go +++ b/datalog.go @@ -17,20 +17,20 @@ const ( ) // datalog is a write-ahead log. -type datalog struct { +type datalog[K, V String] struct { opts *Options curSeg *segment segments [maxSegments]*segment maxSequenceID uint64 } -func openDatalog(opts *Options) (*datalog, error) { +func openDatalog[K, V String](opts *Options) (*datalog[K, V], error) { files, err := opts.FileSystem.ReadDir(".") if err != nil { return nil, err } - dl := &datalog{ + dl := &datalog[K, V]{ opts: opts, } @@ -78,7 +78,7 @@ func parseSegmentName(name string) (uint16, uint64, error) { return uint16(id), seqID, nil } -func (dl *datalog) openSegment(name string, id uint16, seqID uint64) (*segment, error) { +func (dl *datalog[K, V]) openSegment(name string, id uint16, seqID uint64) (*segment, error) { f, err := openFile(dl.opts.FileSystem, name, false) if err != nil { return nil, err @@ -104,7 +104,7 @@ func (dl *datalog) openSegment(name string, id uint16, seqID uint64) (*segment, return seg, nil } -func (dl *datalog) nextWritableSegmentID() (uint16, uint64, error) { +func (dl *datalog[K, V]) nextWritableSegmentID() (uint16, uint64, error) { for id, seg := range dl.segments { // Pick empty segment. if seg == nil { @@ -115,7 +115,7 @@ func (dl *datalog) nextWritableSegmentID() (uint16, uint64, error) { return 0, 0, fmt.Errorf("number of segments exceeds %d", maxSegments) } -func (dl *datalog) swapSegment() error { +func (dl *datalog[K, V]) swapSegment() error { // Pick unfilled segment. for _, seg := range dl.segments { if seg != nil && !seg.meta.Full { @@ -142,7 +142,7 @@ func (dl *datalog) swapSegment() error { return nil } -func (dl *datalog) removeSegment(seg *segment) error { +func (dl *datalog[K, V]) removeSegment(seg *segment) error { dl.segments[seg.id] = nil if err := seg.Close(); err != nil { @@ -163,7 +163,7 @@ func (dl *datalog) removeSegment(seg *segment) error { return nil } -func (dl *datalog) readKeyValue(sl slot) ([]byte, []byte, error) { +func (dl *datalog[K, V]) readKeyValue(sl slot) ([]byte, []byte, error) { off := int64(sl.offset) + 6 // Skip key size and value size. seg := dl.segments[sl.segmentID] keyValue, err := seg.Slice(off, off+int64(sl.kvSize())) @@ -173,20 +173,20 @@ func (dl *datalog) readKeyValue(sl slot) ([]byte, []byte, error) { return keyValue[:sl.keySize], keyValue[sl.keySize:], nil } -func (dl *datalog) readKey(sl slot) ([]byte, error) { +func (dl *datalog[K, V]) readKey(sl slot) ([]byte, error) { off := int64(sl.offset) + 6 seg := dl.segments[sl.segmentID] return seg.Slice(off, off+int64(sl.keySize)) } // trackDel updates segment's metadata for deleted or overwritten items. -func (dl *datalog) trackDel(sl slot) { +func (dl *datalog[K, V]) trackDel(sl slot) { meta := dl.segments[sl.segmentID].meta meta.DeletedKeys++ meta.DeletedBytes += encodedRecordSize(sl.kvSize()) } -func (dl *datalog) del(key []byte) error { +func (dl *datalog[K, V]) del(key K) error { rec := encodeDeleteRecord(key) _, _, err := dl.writeRecord(rec, recordTypeDelete) if err != nil { @@ -197,7 +197,7 @@ func (dl *datalog) del(key []byte) error { return nil } -func (dl *datalog) writeRecord(data []byte, rt recordType) (uint16, uint32, error) { +func (dl *datalog[K, V]) writeRecord(data []byte, rt recordType) (uint16, uint32, error) { if dl.curSeg.meta.Full || dl.curSeg.size+int64(len(data)) > int64(dl.opts.maxSegmentSize) { // Current segment is full, create a new one. dl.curSeg.meta.Full = true @@ -218,15 +218,15 @@ func (dl *datalog) writeRecord(data []byte, rt recordType) (uint16, uint32, erro return dl.curSeg.id, uint32(off), nil } -func (dl *datalog) put(key []byte, value []byte) (uint16, uint32, error) { +func (dl *datalog[K, V]) put(key K, value V) (uint16, uint32, error) { return dl.writeRecord(encodePutRecord(key, value), recordTypePut) } -func (dl *datalog) sync() error { +func (dl *datalog[K, V]) sync() error { return dl.curSeg.Sync() } -func (dl *datalog) close() error { +func (dl *datalog[K, V]) close() error { for _, seg := range dl.segments { if seg == nil { continue @@ -243,7 +243,7 @@ func (dl *datalog) close() error { } // segmentsBySequenceID returns segments ordered from oldest to newest. -func (dl *datalog) segmentsBySequenceID() []*segment { +func (dl *datalog[K, V]) segmentsBySequenceID() []*segment { var segments []*segment for _, seg := range dl.segments { diff --git a/datalog_test.go b/datalog_test.go index ec4535d..585aa57 100644 --- a/datalog_test.go +++ b/datalog_test.go @@ -6,7 +6,7 @@ import ( "github.com/akrylysov/pogreb/internal/assert" ) -func (dl *datalog) segmentMetas() []segmentMeta { +func (dl *datalog[K, V]) segmentMetas() []segmentMeta { var metas []segmentMeta for _, seg := range dl.segmentsBySequenceID() { metas = append(metas, *seg.meta) diff --git a/db.go b/db.go index 517baf1..ca31c24 100644 --- a/db.go +++ b/db.go @@ -1,12 +1,12 @@ package pogreb import ( - "bytes" "context" "math" "os" "sync" "time" + "unsafe" "github.com/akrylysov/pogreb/fs" "github.com/akrylysov/pogreb/internal/errors" @@ -27,13 +27,18 @@ const ( dbMetaName = "db" + metaExt ) +// String is a variable byte sequence. +type String interface { + []byte | string +} + // DB represents the key-value storage. // All DB methods are safe for concurrent use by multiple goroutines. -type DB struct { +type DB[K, V String] struct { mu sync.RWMutex // Allows multiple database readers or a single writer. opts *Options index *index - datalog *datalog + datalog *datalog[K, V] lock fs.LockFile // Prevents opening multiple instances of the same database. hashSeed uint32 metrics *Metrics @@ -49,7 +54,7 @@ type dbMeta struct { // Open opens or creates a new DB. // The DB must be closed after use, by calling Close method. -func Open(path string, opts *Options) (*DB, error) { +func Open[K, V String](path string, opts *Options) (*DB[K, V], error) { opts = opts.copyWithDefaults(path) if err := os.MkdirAll(path, 0755); err != nil { @@ -85,12 +90,12 @@ func Open(path string, opts *Options) (*DB, error) { return nil, errors.Wrap(err, "opening index") } - datalog, err := openDatalog(opts) + datalog, err := openDatalog[K, V](opts) if err != nil { return nil, errors.Wrap(err, "opening datalog") } - db := &DB{ + db := &DB[K, V]{ opts: opts, index: index, datalog: datalog, @@ -125,20 +130,20 @@ func Open(path string, opts *Options) (*DB, error) { return db, nil } -func cloneBytes(src []byte) []byte { +func typedCopy[T String](src []byte) T { dst := make([]byte, len(src)) copy(dst, src) - return dst + return *(*T)(unsafe.Pointer(&dst)) } -func (db *DB) writeMeta() error { +func (db *DB[K, V]) writeMeta() error { m := dbMeta{ HashSeed: db.hashSeed, } return writeGobFile(db.opts.FileSystem, dbMetaName, m) } -func (db *DB) readMeta() error { +func (db *DB[K, V]) readMeta() error { m := dbMeta{} if err := readGobFile(db.opts.FileSystem, dbMetaName, &m); err != nil { return err @@ -147,10 +152,6 @@ func (db *DB) readMeta() error { return nil } -func (db *DB) hash(data []byte) uint32 { - return hash.Sum32WithSeed(data, db.hashSeed) -} - // newNullableTicker is a wrapper around time.NewTicker that allows creating a nil ticker. // A nil ticker never ticks. func newNullableTicker(d time.Duration) (<-chan time.Time, func()) { @@ -161,7 +162,7 @@ func newNullableTicker(d time.Duration) (<-chan time.Time, func()) { return nil, func() {} } -func (db *DB) startBackgroundWorker() { +func (db *DB[K, V]) startBackgroundWorker() { ctx, cancel := context.WithCancel(context.Background()) db.cancelBgWorker = cancel db.closeWg.Add(1) @@ -195,12 +196,12 @@ func (db *DB) startBackgroundWorker() { } // Get returns the value for the given key stored in the DB or nil if the key doesn't exist. -func (db *DB) Get(key []byte) ([]byte, error) { - h := db.hash(key) +func (db *DB[K, V]) Get(key K) (V, error) { + h := hash.Sum32WithSeed(key, db.hashSeed) db.metrics.Gets.Add(1) db.mu.RLock() defer db.mu.RUnlock() - var retValue []byte + var hit V err := db.index.get(h, func(sl slot) (bool, error) { if uint16(len(key)) != sl.keySize { return false, nil @@ -209,22 +210,23 @@ func (db *DB) Get(key []byte) ([]byte, error) { if err != nil { return true, err } - if bytes.Equal(key, slKey) { - retValue = cloneBytes(value) + if string(key) == string(slKey) { + hit = typedCopy[V](value) return true, nil } db.metrics.HashCollisions.Add(1) return false, nil }) if err != nil { - return nil, err + var zero V + return zero, err } - return retValue, nil + return hit, nil } // Has returns true if the DB contains the given key. -func (db *DB) Has(key []byte) (bool, error) { - h := db.hash(key) +func (db *DB[K, V]) Has(key K) (bool, error) { + h := hash.Sum32WithSeed(key, db.hashSeed) db.metrics.Gets.Add(1) found := false db.mu.RLock() @@ -237,7 +239,7 @@ func (db *DB) Has(key []byte) (bool, error) { if err != nil { return true, err } - if bytes.Equal(key, slKey) { + if string(key) == string(slKey) { found = true return true, nil } @@ -249,7 +251,7 @@ func (db *DB) Has(key []byte) (bool, error) { return found, nil } -func (db *DB) put(sl slot, key []byte) error { +func (db *DB[K, V]) put(sl slot, key K) error { return db.index.put(sl, func(cursl slot) (bool, error) { if uint16(len(key)) != cursl.keySize { return false, nil @@ -258,7 +260,7 @@ func (db *DB) put(sl slot, key []byte) error { if err != nil { return true, err } - if bytes.Equal(key, slKey) { + if string(key) == string(slKey) { db.datalog.trackDel(cursl) // Overwriting existing key. return true, nil } @@ -267,14 +269,14 @@ func (db *DB) put(sl slot, key []byte) error { } // Put sets the value for the given key. It updates the value for the existing key. -func (db *DB) Put(key []byte, value []byte) error { +func (db *DB[K, V]) Put(key K, value V) error { if len(key) > MaxKeyLength { return errKeyTooLarge } if len(value) > MaxValueLength { return errValueTooLarge } - h := db.hash(key) + h := hash.Sum32WithSeed(key, db.hashSeed) db.metrics.Puts.Add(1) db.mu.Lock() defer db.mu.Unlock() @@ -302,7 +304,7 @@ func (db *DB) Put(key []byte, value []byte) error { return nil } -func (db *DB) del(h uint32, key []byte, writeWAL bool) error { +func (db *DB[K, V]) del(h uint32, key K, writeWAL bool) error { err := db.index.delete(h, func(sl slot) (b bool, e error) { if uint16(len(key)) != sl.keySize { return false, nil @@ -311,7 +313,7 @@ func (db *DB) del(h uint32, key []byte, writeWAL bool) error { if err != nil { return true, err } - if bytes.Equal(key, slKey) { + if string(key) == string(slKey) { db.datalog.trackDel(sl) var err error if writeWAL { @@ -325,8 +327,8 @@ func (db *DB) del(h uint32, key []byte, writeWAL bool) error { } // Delete deletes the given key from the DB. -func (db *DB) Delete(key []byte) error { - h := db.hash(key) +func (db *DB[K, V]) Delete(key K) error { + h := hash.Sum32WithSeed(key, db.hashSeed) db.metrics.Dels.Add(1) db.mu.Lock() defer db.mu.Unlock() @@ -340,7 +342,7 @@ func (db *DB) Delete(key []byte) error { } // Close closes the DB. -func (db *DB) Close() error { +func (db *DB[K, V]) Close() error { if db.cancelBgWorker != nil { db.cancelBgWorker() } @@ -362,36 +364,36 @@ func (db *DB) Close() error { return nil } -func (db *DB) sync() error { +func (db *DB[K, V]) sync() error { return db.datalog.sync() } // Items returns a new ItemIterator. -func (db *DB) Items() *ItemIterator { - return &ItemIterator{db: db} +func (db *DB[K, V]) Items() *ItemIterator[K, V] { + return &ItemIterator[K, V]{db: db} } // Sync commits the contents of the database to the backing FileSystem. -func (db *DB) Sync() error { +func (db *DB[K, V]) Sync() error { db.mu.Lock() defer db.mu.Unlock() return db.sync() } // Count returns the number of keys in the DB. -func (db *DB) Count() uint32 { +func (db *DB[K, V]) Count() uint32 { db.mu.RLock() defer db.mu.RUnlock() return db.index.count() } // Metrics returns the DB metrics. -func (db *DB) Metrics() *Metrics { +func (db *DB[K, V]) Metrics() *Metrics { return db.metrics } // FileSize returns the total size of the disk storage used by the DB. -func (db *DB) FileSize() (int64, error) { +func (db *DB[K, V]) FileSize() (int64, error) { var size int64 files, err := db.opts.FileSystem.ReadDir(".") if err != nil { diff --git a/db_test.go b/db_test.go index a0cec32..133a816 100644 --- a/db_test.go +++ b/db_test.go @@ -87,7 +87,7 @@ func TestHeaderSize(t *testing.T) { } } -func createTestDB(opts *Options) (*DB, error) { +func createTestDB(opts *Options) (*DB[[]byte, []byte], error) { if opts == nil { opts = &Options{FileSystem: testFS} } else { @@ -103,7 +103,7 @@ func createTestDB(opts *Options) (*DB, error) { for _, file := range files { _ = testFS.Remove(filepath.Join(path, file.Name())) } - return Open(path, opts) + return Open[[]byte, []byte](path, opts) } func TestEmpty(t *testing.T) { @@ -111,7 +111,7 @@ func TestEmpty(t *testing.T) { db, err := createTestDB(opts) assert.Nil(t, err) assert.Nil(t, db.Close()) - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, err) assert.Nil(t, db.Close()) } @@ -178,7 +178,7 @@ func TestFull(t *testing.T) { verifyKeysAndClose(0) // Open and check again - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, err) verifyKeysAndClose(0) @@ -188,14 +188,14 @@ func TestFull(t *testing.T) { assert.Nil(t, testFS.Remove(filepath.Join(testDBName, indexMetaName))) // Open and check again - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, err) verifyKeysAndClose(0) assert.Equal(t, expectedSegMetas, db.datalog.segmentMetas()) // Update all items - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, err) for i = 0; i < n; i++ { assert.Nil(t, db.Put([]byte{i}, []byte{i + 6})) @@ -203,7 +203,7 @@ func TestFull(t *testing.T) { verifyKeysAndClose(6) // Delete all items - db, err = Open(testDBName, &Options{BackgroundSyncInterval: time.Millisecond}) + db, err = Open[[]byte, []byte](testDBName, &Options{BackgroundSyncInterval: time.Millisecond}) assert.Nil(t, err) for i = 0; i < n; i++ { assert.Nil(t, db.Delete([]byte{i})) @@ -223,7 +223,7 @@ func TestLock(t *testing.T) { assert.Nil(t, err) // Opening already opened database returns an error. - db2, err2 := Open(testDBName, opts) + db2, err2 := Open[string, string](testDBName, opts) assert.Nil(t, db2) assert.NotNil(t, err2) @@ -304,7 +304,7 @@ func TestCorruptedIndex(t *testing.T) { assert.Nil(t, err) assert.Nil(t, f.Close()) - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, db) assert.NotNil(t, err) } diff --git a/example_test.go b/example_test.go index b47656d..aff3ac0 100644 --- a/example_test.go +++ b/example_test.go @@ -7,7 +7,7 @@ import ( ) func Example() { - db, err := pogreb.Open("pogreb.test", nil) + db, err := pogreb.Open[string, string]("pogreb.test", nil) if err != nil { log.Fatal(err) return @@ -15,12 +15,12 @@ func Example() { defer db.Close() // Insert a new key-value pair. - if err := db.Put([]byte("testKey"), []byte("testValue")); err != nil { + if err := db.Put("testKey", "testValue"); err != nil { log.Fatal(err) } // Retrieve the inserted value. - val, err := db.Get([]byte("testKey")) + val, err := db.Get("testKey") if err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index d2a3a28..0f6574e 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/akrylysov/pogreb -go 1.12 +go 1.18 diff --git a/internal/hash/murmurhash32.go b/internal/hash/murmurhash32.go index bb8de84..70c0364 100644 --- a/internal/hash/murmurhash32.go +++ b/internal/hash/murmurhash32.go @@ -10,7 +10,7 @@ const ( ) // Sum32WithSeed is a port of MurmurHash3_x86_32 function. -func Sum32WithSeed(data []byte, seed uint32) uint32 { +func Sum32WithSeed[String []byte | string](data String, seed uint32) uint32 { h1 := seed dlen := len(data) diff --git a/iterator.go b/iterator.go index 19f4ccc..47c78a6 100644 --- a/iterator.go +++ b/iterator.go @@ -8,21 +8,21 @@ import ( // ErrIterationDone is returned by ItemIterator.Next calls when there are no more items to return. var ErrIterationDone = errors.New("no more items in iterator") -type item struct { - key []byte - value []byte +type item[K, V String] struct { + key K + value V } // ItemIterator is an iterator over DB key-value pairs. It iterates the items in an unspecified order. -type ItemIterator struct { - db *DB +type ItemIterator[K, V String] struct { + db *DB[K, V] nextBucketIdx uint32 - queue []item + queue []item[K, V] mu sync.Mutex } // fetchItems adds items to the iterator queue from a bucket located at nextBucketIdx. -func (it *ItemIterator) fetchItems(nextBucketIdx uint32) error { +func (it *ItemIterator[K, V]) fetchItems(nextBucketIdx uint32) error { bit := it.db.index.newBucketIterator(nextBucketIdx) for { b, err := bit.next() @@ -42,15 +42,16 @@ func (it *ItemIterator) fetchItems(nextBucketIdx uint32) error { if err != nil { return err } - key = cloneBytes(key) - value = cloneBytes(value) - it.queue = append(it.queue, item{key: key, value: value}) + it.queue = append(it.queue, item[K, V]{ + key: typedCopy[K](key), + value: typedCopy[V](value), + }) } } } // Next returns the next key-value pair if available, otherwise it returns ErrIterationDone error. -func (it *ItemIterator) Next() ([]byte, []byte, error) { +func (it *ItemIterator[K, V]) Next() (K, V, error) { it.mu.Lock() defer it.mu.Unlock() @@ -60,7 +61,9 @@ func (it *ItemIterator) Next() ([]byte, []byte, error) { // The iterator queue is empty and we have more buckets to check. for len(it.queue) == 0 && it.nextBucketIdx < it.db.index.numBuckets { if err := it.fetchItems(it.nextBucketIdx); err != nil { - return nil, nil, err + var zeroK K + var zeroV V + return zeroK, zeroV, err } it.nextBucketIdx++ } @@ -71,5 +74,7 @@ func (it *ItemIterator) Next() ([]byte, []byte, error) { return item.key, item.value, nil } - return nil, nil, ErrIterationDone + var zeroK K + var zeroV V + return zeroK, zeroV, ErrIterationDone } diff --git a/recovery.go b/recovery.go index 3381348..d9699e5 100644 --- a/recovery.go +++ b/recovery.go @@ -5,6 +5,7 @@ import ( "path/filepath" "github.com/akrylysov/pogreb/fs" + "github.com/akrylysov/pogreb/internal/hash" ) const ( @@ -108,7 +109,7 @@ func (it *recoveryIterator) next() (record, error) { } } -func (db *DB) recover() error { +func (db *DB[K, V]) recover() error { logger.Println("started recovery") logger.Println("rebuilding index...") @@ -123,7 +124,7 @@ func (db *DB) recover() error { return err } - h := db.hash(rec.key) + h := hash.Sum32WithSeed(rec.key, db.hashSeed) meta := db.datalog.segments[rec.segmentID].meta if rec.rtype == recordTypePut { sl := slot{ @@ -133,12 +134,12 @@ func (db *DB) recover() error { valueSize: uint32(len(rec.value)), offset: rec.offset, } - if err := db.put(sl, rec.key); err != nil { + if err := db.put(sl, (K)(rec.key)); err != nil { return err } meta.PutRecords++ } else { - if err := db.del(h, rec.key, false); err != nil { + if err := db.del(h, (K)(rec.key), false); err != nil { return err } meta.DeleteRecords++ diff --git a/recovery_test.go b/recovery_test.go index ae72fc7..52dd74b 100644 --- a/recovery_test.go +++ b/recovery_test.go @@ -84,12 +84,12 @@ func TestRecovery(t *testing.T) { assert.Nil(t, testCase.fn()) - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, err) assert.Equal(t, uint32(128), db.Count()) assert.Nil(t, db.Close()) - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, err) assert.Equal(t, uint32(128), db.Count()) for i = 0; i < 128; i++ { @@ -115,7 +115,7 @@ func TestRecoveryDelete(t *testing.T) { // Simulate crash. assert.Nil(t, touchFile(testFS, filepath.Join(testDBName, lockName))) - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, err) assert.Equal(t, uint32(1), db.Count()) @@ -180,7 +180,7 @@ func TestRecoveryCompaction(t *testing.T) { // Simulate crash. assert.Nil(t, touchFile(testFS, filepath.Join(testDBName, lockName))) - db, err = Open(testDBName, opts) + db, err = Open[[]byte, []byte](testDBName, opts) assert.Nil(t, err) assert.Equal(t, uint32(2), db.Count()) diff --git a/segment.go b/segment.go index 34225f2..750e402 100644 --- a/segment.go +++ b/segment.go @@ -61,7 +61,7 @@ func encodedRecordSize(kvSize uint32) uint32 { return 2 + 4 + kvSize + 4 } -func encodeRecord(key []byte, value []byte, rt recordType) []byte { +func encodeRecord[K, V String](key K, value V, rt recordType) []byte { size := encodedRecordSize(uint32(len(key) + len(value))) data := make([]byte, size) binary.LittleEndian.PutUint16(data[:2], uint16(len(key))) @@ -79,12 +79,12 @@ func encodeRecord(key []byte, value []byte, rt recordType) []byte { return data } -func encodePutRecord(key []byte, value []byte) []byte { +func encodePutRecord[K, V String](key K, value V) []byte { return encodeRecord(key, value, recordTypePut) } -func encodeDeleteRecord(key []byte) []byte { - return encodeRecord(key, nil, recordTypeDelete) +func encodeDeleteRecord[K String](key K) []byte { + return encodeRecord(key, []byte(nil), recordTypeDelete) } // segmentIterator iterates over segment records.