Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"sync/atomic"

"github.com/akrylysov/pogreb/internal/errors"
"github.com/akrylysov/pogreb/internal/hash"
)

// promoteRecord writes the record to the current segment if the index still points to the record.
// Otherwise it discards the record.
func (db *DB) promoteRecord(rec record) (bool, error) {
hash := db.hash(rec.key)
func (db *DB[K, V]) promoteRecord(rec record) (bool, error) {
hash := hash.Sum32WithSeed(rec.key, db.hashSeed)
it := db.index.newBucketIterator(db.index.bucketIndex(hash))
for {
b, err := it.next()
Expand Down Expand Up @@ -55,7 +56,7 @@ type CompactionResult struct {
ReclaimedBytes int
}

func (db *DB) compact(sourceSeg *segment) (CompactionResult, error) {
func (db *DB[K, V]) compact(sourceSeg *segment) (CompactionResult, error) {
cr := CompactionResult{}

db.mu.Lock()
Expand Down Expand Up @@ -102,7 +103,7 @@ func (db *DB) compact(sourceSeg *segment) (CompactionResult, error) {
}

// pickForCompaction returns segments eligible for compaction.
func (db *DB) pickForCompaction() []*segment {
func (db *DB[K, V]) pickForCompaction() []*segment {
segments := db.datalog.segmentsBySequenceID()
var picked []*segment
for i := len(segments) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -131,7 +132,7 @@ func (db *DB) pickForCompaction() []*segment {

// Compact compacts the DB. Deleted and overwritten items are discarded.
// Returns an error if compaction is already in progress.
func (db *DB) Compact() (CompactionResult, error) {
func (db *DB[K, V]) Compact() (CompactionResult, error) {
cr := CompactionResult{}

// Run only a single compaction at a time.
Expand Down
28 changes: 14 additions & 14 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func fileExists(name string) bool {
return !os.IsNotExist(err)
}

func countSegments(t *testing.T, db *DB) int {
func countSegments[K, V String](t *testing.T, db *DB[K, V]) int {
t.Helper()
db.mu.RLock()
defer db.mu.RUnlock()
Expand All @@ -31,7 +31,7 @@ func countSegments(t *testing.T, db *DB) int {
}

func TestCompaction(t *testing.T) {
run := func(name string, f func(t *testing.T, db *DB)) bool {
run := func(name string, f func(t *testing.T, db *DB[[]byte, []byte])) bool {
opts := &Options{
maxSegmentSize: 1024,
compactionMinSegmentSize: 520,
Expand All @@ -45,7 +45,7 @@ func TestCompaction(t *testing.T) {
})
}

run("empty", func(t *testing.T, db *DB) {
run("empty", func(t *testing.T, db *DB[[]byte, []byte]) {
assert.Equal(t, 1, countSegments(t, db))
cr, err := db.Compact()
assert.Nil(t, err)
Expand All @@ -56,7 +56,7 @@ func TestCompaction(t *testing.T) {
// A single segment file can fit 42 items (12 bytes per item, 1 byte key, 1 byte value).
const maxItemsPerFile byte = 42

run("compact only segment", func(t *testing.T, db *DB) {
run("compact only segment", func(t *testing.T, db *DB[[]byte, []byte]) {
// Write items and then overwrite them on the second iteration.
for j := 0; j < 10; j++ {
assert.Nil(t, db.Put([]byte{0}, []byte{0}))
Expand All @@ -74,7 +74,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, false, fileExists(filepath.Join(testDBName, segmentMetaName(0, 1))))
})

run("compact entire segment", func(t *testing.T, db *DB) {
run("compact entire segment", func(t *testing.T, db *DB[[]byte, []byte]) {
// Write items and then overwrite them on the second iteration.
for i := 0; i < 2; i++ {
for j := byte(0); j < maxItemsPerFile; j++ {
Expand All @@ -92,7 +92,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, &segmentMeta{PutRecords: 42}, db.datalog.segments[1].meta)
})

run("compact part of segment", func(t *testing.T, db *DB) {
run("compact part of segment", func(t *testing.T, db *DB[[]byte, []byte]) {
for j := byte(0); j < maxItemsPerFile; j++ {
assert.Nil(t, db.Put([]byte{j}, []byte{j}))
}
Expand All @@ -110,7 +110,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, &segmentMeta{PutRecords: 42}, db.datalog.segments[1].meta)
})

run("compact multiple segments", func(t *testing.T, db *DB) {
run("compact multiple segments", func(t *testing.T, db *DB[[]byte, []byte]) {
for i := 0; i < 4; i++ {
for j := byte(0); j < maxItemsPerFile; j++ {
assert.Nil(t, db.Put([]byte{j}, []byte{j}))
Expand All @@ -123,7 +123,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, 1, countSegments(t, db))
})

run("zero deleted bytes", func(t *testing.T, db *DB) {
run("zero deleted bytes", func(t *testing.T, db *DB[[]byte, []byte]) {
for i := byte(0); i < maxItemsPerFile; i++ {
assert.Nil(t, db.Put([]byte{i}, []byte{i}))
}
Expand All @@ -136,7 +136,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, &segmentMeta{PutRecords: 42}, db.datalog.segments[0].meta)
})

run("below threshold", func(t *testing.T, db *DB) {
run("below threshold", func(t *testing.T, db *DB[[]byte, []byte]) {
for j := byte(0); j < maxItemsPerFile; j++ {
assert.Nil(t, db.Put([]byte{j}, []byte{j}))
}
Expand All @@ -150,7 +150,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, 2, countSegments(t, db))
})

run("above threshold", func(t *testing.T, db *DB) {
run("above threshold", func(t *testing.T, db *DB[[]byte, []byte]) {
for j := byte(0); j < maxItemsPerFile; j++ {
assert.Nil(t, db.Put([]byte{j}, []byte{j}))
}
Expand All @@ -165,7 +165,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, 1, countSegments(t, db))
})

run("compact single segment in the middle: puts", func(t *testing.T, db *DB) {
run("compact single segment in the middle: puts", func(t *testing.T, db *DB[[]byte, []byte]) {
// Write two segments.
for j := byte(0); j < maxItemsPerFile*2; j++ {
assert.Nil(t, db.Put([]byte{j}, []byte{j}))
Expand All @@ -181,7 +181,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, 2, countSegments(t, db))
})

run("compact single segment in the middle: deletes", func(t *testing.T, db *DB) {
run("compact single segment in the middle: deletes", func(t *testing.T, db *DB[[]byte, []byte]) {
for j := byte(0); j < (maxItemsPerFile*2)-1; j++ {
assert.Nil(t, db.Put([]byte{j}, []byte{j}))
}
Expand All @@ -200,7 +200,7 @@ func TestCompaction(t *testing.T) {
assert.Equal(t, 2, countSegments(t, db))
})

run("delete and compact all segments", func(t *testing.T, db *DB) {
run("delete and compact all segments", func(t *testing.T, db *DB[[]byte, []byte]) {
// Write items.
for i := byte(0); i < maxItemsPerFile; i++ {
assert.Nil(t, db.Put([]byte{i}, []byte{i}))
Expand All @@ -220,7 +220,7 @@ func TestCompaction(t *testing.T) {
assert.Nil(t, db.datalog.segments[1])
})

run("busy error", func(t *testing.T, db *DB) {
run("busy error", func(t *testing.T, db *DB[[]byte, []byte]) {
wg := sync.WaitGroup{}
wg.Add(1)
db.mu.Lock()
Expand Down
32 changes: 16 additions & 16 deletions datalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ const (
)

// datalog is a write-ahead log.
type datalog struct {
type datalog[K, V String] struct {
opts *Options
curSeg *segment
segments [maxSegments]*segment
maxSequenceID uint64
}

func openDatalog(opts *Options) (*datalog, error) {
func openDatalog[K, V String](opts *Options) (*datalog[K, V], error) {
files, err := opts.FileSystem.ReadDir(".")
if err != nil {
return nil, err
}

dl := &datalog{
dl := &datalog[K, V]{
opts: opts,
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func parseSegmentName(name string) (uint16, uint64, error) {
return uint16(id), seqID, nil
}

func (dl *datalog) openSegment(name string, id uint16, seqID uint64) (*segment, error) {
func (dl *datalog[K, V]) openSegment(name string, id uint16, seqID uint64) (*segment, error) {
f, err := openFile(dl.opts.FileSystem, name, false)
if err != nil {
return nil, err
Expand All @@ -104,7 +104,7 @@ func (dl *datalog) openSegment(name string, id uint16, seqID uint64) (*segment,
return seg, nil
}

func (dl *datalog) nextWritableSegmentID() (uint16, uint64, error) {
func (dl *datalog[K, V]) nextWritableSegmentID() (uint16, uint64, error) {
for id, seg := range dl.segments {
// Pick empty segment.
if seg == nil {
Expand All @@ -115,7 +115,7 @@ func (dl *datalog) nextWritableSegmentID() (uint16, uint64, error) {
return 0, 0, fmt.Errorf("number of segments exceeds %d", maxSegments)
}

func (dl *datalog) swapSegment() error {
func (dl *datalog[K, V]) swapSegment() error {
// Pick unfilled segment.
for _, seg := range dl.segments {
if seg != nil && !seg.meta.Full {
Expand All @@ -142,7 +142,7 @@ func (dl *datalog) swapSegment() error {
return nil
}

func (dl *datalog) removeSegment(seg *segment) error {
func (dl *datalog[K, V]) removeSegment(seg *segment) error {
dl.segments[seg.id] = nil

if err := seg.Close(); err != nil {
Expand All @@ -163,7 +163,7 @@ func (dl *datalog) removeSegment(seg *segment) error {
return nil
}

func (dl *datalog) readKeyValue(sl slot) ([]byte, []byte, error) {
func (dl *datalog[K, V]) readKeyValue(sl slot) ([]byte, []byte, error) {
off := int64(sl.offset) + 6 // Skip key size and value size.
seg := dl.segments[sl.segmentID]
keyValue, err := seg.Slice(off, off+int64(sl.kvSize()))
Expand All @@ -173,20 +173,20 @@ func (dl *datalog) readKeyValue(sl slot) ([]byte, []byte, error) {
return keyValue[:sl.keySize], keyValue[sl.keySize:], nil
}

func (dl *datalog) readKey(sl slot) ([]byte, error) {
func (dl *datalog[K, V]) readKey(sl slot) ([]byte, error) {
off := int64(sl.offset) + 6
seg := dl.segments[sl.segmentID]
return seg.Slice(off, off+int64(sl.keySize))
}

// trackDel updates segment's metadata for deleted or overwritten items.
func (dl *datalog) trackDel(sl slot) {
func (dl *datalog[K, V]) trackDel(sl slot) {
meta := dl.segments[sl.segmentID].meta
meta.DeletedKeys++
meta.DeletedBytes += encodedRecordSize(sl.kvSize())
}

func (dl *datalog) del(key []byte) error {
func (dl *datalog[K, V]) del(key K) error {
rec := encodeDeleteRecord(key)
_, _, err := dl.writeRecord(rec, recordTypeDelete)
if err != nil {
Expand All @@ -197,7 +197,7 @@ func (dl *datalog) del(key []byte) error {
return nil
}

func (dl *datalog) writeRecord(data []byte, rt recordType) (uint16, uint32, error) {
func (dl *datalog[K, V]) writeRecord(data []byte, rt recordType) (uint16, uint32, error) {
if dl.curSeg.meta.Full || dl.curSeg.size+int64(len(data)) > int64(dl.opts.maxSegmentSize) {
// Current segment is full, create a new one.
dl.curSeg.meta.Full = true
Expand All @@ -218,15 +218,15 @@ func (dl *datalog) writeRecord(data []byte, rt recordType) (uint16, uint32, erro
return dl.curSeg.id, uint32(off), nil
}

func (dl *datalog) put(key []byte, value []byte) (uint16, uint32, error) {
func (dl *datalog[K, V]) put(key K, value V) (uint16, uint32, error) {
return dl.writeRecord(encodePutRecord(key, value), recordTypePut)
}

func (dl *datalog) sync() error {
func (dl *datalog[K, V]) sync() error {
return dl.curSeg.Sync()
}

func (dl *datalog) close() error {
func (dl *datalog[K, V]) close() error {
for _, seg := range dl.segments {
if seg == nil {
continue
Expand All @@ -243,7 +243,7 @@ func (dl *datalog) close() error {
}

// segmentsBySequenceID returns segments ordered from oldest to newest.
func (dl *datalog) segmentsBySequenceID() []*segment {
func (dl *datalog[K, V]) segmentsBySequenceID() []*segment {
var segments []*segment

for _, seg := range dl.segments {
Expand Down
2 changes: 1 addition & 1 deletion datalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/akrylysov/pogreb/internal/assert"
)

func (dl *datalog) segmentMetas() []segmentMeta {
func (dl *datalog[K, V]) segmentMetas() []segmentMeta {
var metas []segmentMeta
for _, seg := range dl.segmentsBySequenceID() {
metas = append(metas, *seg.meta)
Expand Down
Loading