Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions storage/posix/antispam/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (
)

const (
DefaultMaxBatchSize = 1500
DefaultPushbackThreshold = 2048
DefaultMaxBatchSize = 20000
DefaultPushbackThreshold = 2048
DefaultCompactionInterval = 15 * time.Minute

// defaultBatchTimeout is the max permitted duration for a single "chunk" of antispam updates.
defaultBatchTimeout = 10 * time.Second
Expand All @@ -56,10 +57,6 @@ type AntispamOpts struct {
//
// Larger batches can enable (up to a point) higher throughput, but care should be taken not to
// overload the Spanner instance.
//
// During testing, we've found that 1500 appears to offer maximum throughput when using Spanner instances
// with 300 or more PU. Smaller deployments (e.g. 100 PU) will likely perform better with smaller batch
// sizes of around 64.
MaxBatchSize uint

// PushbackThreshold allows configuration of when to start responding to Add requests with pushback due to
Expand All @@ -68,6 +65,15 @@ type AntispamOpts struct {
// When the antispam follower is at least this many entries behind the size of the locally integrated tree,
// the antispam decorator will return tessera.ErrPushback for every Add request.
PushbackThreshold uint

// CompactionInterval is the duration between calls to RunValueLogGC() on the underlying Badger DB.
CompactionInterval time.Duration

// BadgerOptions is an optional func which can apply arbitrary options to the config of the underlying Badger DB.
//
// If supplied, this function will be called as the final step in configuring BadgerDB options, potentially allowing
// antispam-configured options to be overridden; use with caution!
BadgerOptions func(opts badger.Options) badger.Options
Comment thread
AlCutter marked this conversation as resolved.
}

// NewAntispam returns an antispam driver which uses Badger to maintain a mapping between
Expand All @@ -84,8 +90,14 @@ func NewAntispam(ctx context.Context, badgerPath string, opts AntispamOpts) (*An
if opts.PushbackThreshold == 0 {
opts.PushbackThreshold = DefaultPushbackThreshold
}
if opts.CompactionInterval == 0 {
opts.CompactionInterval = DefaultCompactionInterval
}

bOpts := badger.DefaultOptions(badgerPath).WithLogger(&slogger{})
if opts.BadgerOptions != nil {
bOpts = opts.BadgerOptions(bOpts)
}
// Open the Badger database located at badgerPath, it will be created if it doesn't exist.
db, err := badger.Open(bOpts)
if err != nil {
Expand All @@ -100,7 +112,7 @@ func NewAntispam(ctx context.Context, badgerPath string, opts AntispamOpts) (*An
go func() {
// runsInTick tracks the number of GC runs per tick.
var runsInTick int64
ticker := time.NewTicker(1 * time.Minute)
ticker := time.NewTicker(opts.CompactionInterval)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -381,7 +393,7 @@ func (f *follower) Follow(ctx context.Context, lr tessera.LogReader) {
return fmt.Errorf("failed to update follower state: %v", err)
}

// Microseconds / 1000.0 and not milliseconds to record sub-millisecond durations.
// Microseconds / 1000.0 and not milliseconds to record sub-millisecond durations.
followTxnDuration.Record(ctx, float64(time.Since(batchStart).Microseconds())/1000.0)
followTxnEntriesCounter.Record(ctx, int64(numAdded))

Expand Down
Loading