Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions extensions/tn_vacuum/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,22 +332,42 @@ func (e *Extension) maybeRun(ctx context.Context, blockHeight int64) {
svc := e.service
metricsRecorder := e.metrics
nowFn := e.now
store := e.stateStore
e.mu.RUnlock()

if !cfg.Enabled || mech == nil || runner == nil {
return
}

if state.LastRunHeight != 0 {
if blockHeight <= state.LastRunHeight {
return
if state.LastRunHeight == 0 {
seeded := runState{LastRunHeight: blockHeight}
if nowFn != nil {
seeded.LastRunAt = nowFn().UTC()
}
if blockHeight-state.LastRunHeight < cfg.BlockInterval {
if metricsRecorder != nil {
metricsRecorder.RecordVacuumSkipped(ctx, "block_interval_not_met")
if store != nil {
if saveErr := store.Save(ctx, seeded); saveErr != nil {
logger.Warn("failed to persist tn_vacuum seed state", "error", saveErr)
}
return
}
e.mu.Lock()
if blockHeight > e.state.LastRunHeight {
e.state = seeded
}
e.mu.Unlock()
if metricsRecorder != nil {
metricsRecorder.RecordVacuumSkipped(ctx, "first_block_seed")
}
return
}

if blockHeight <= state.LastRunHeight {
return
}
if blockHeight-state.LastRunHeight < cfg.BlockInterval {
if metricsRecorder != nil {
metricsRecorder.RecordVacuumSkipped(ctx, "block_interval_not_met")
}
return
}

reason := fmt.Sprintf("block_interval:%d", blockHeight)
Expand Down
112 changes: 92 additions & 20 deletions extensions/tn_vacuum/vacuum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,25 +217,25 @@ func TestEngineReadyPreparesMechanism(t *testing.T) {
}

ext := GetExtension()
ext.setStateStore(&stubStateStore{})
ext.setStateStore(&stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true})

app := &common.App{Service: svc}
require.NoError(t, engineReadyHook(ctx, app))
require.Equal(t, 1, stub.preparedCount())

block := &common.BlockContext{Height: 1}
block := &common.BlockContext{Height: 4}
require.NoError(t, endBlockHook(ctx, app, block))
waitForRunCount(t, stub, 1)
firstRun, ok := stub.runAt(0)
require.True(t, ok)
require.Equal(t, "block_interval:1", firstRun.Reason)
require.Equal(t, "block_interval:4", firstRun.Reason)
require.Equal(t, 2, firstRun.PgRepackJobs)

require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 2}))
require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 5}))
time.Sleep(50 * time.Millisecond)
require.Len(t, stub.runsSnapshot(), 1)

require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 4}))
require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 7}))
waitForRunCount(t, stub, 2)
}

Expand Down Expand Up @@ -279,12 +279,12 @@ func TestRunReportEnhancement(t *testing.T) {
}

ext := GetExtension()
ext.setStateStore(&stubStateStore{})
ext.setStateStore(&stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true})

app := &common.App{Service: svc}
require.NoError(t, engineReadyHook(ctx, app))

block := &common.BlockContext{Height: 1}
block := &common.BlockContext{Height: 2}
require.NoError(t, endBlockHook(ctx, app, block))

waitForRunCount(t, stub, 1)
Expand Down Expand Up @@ -324,24 +324,24 @@ func TestVacuumSkippedMetrics(t *testing.T) {
}

ext := GetExtension()
ext.setStateStore(&stubStateStore{})
ext.setStateStore(&stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true})

app := &common.App{Service: svc}
require.NoError(t, engineReadyHook(ctx, app))

// First run at height 1
block := &common.BlockContext{Height: 1}
// Should run at height 11 (interval met from seeded height 1)
block := &common.BlockContext{Height: 11}
require.NoError(t, endBlockHook(ctx, app, block))
waitForRunCount(t, stub, 1)

// Should be skipped at height 5 (interval not met)
block = &common.BlockContext{Height: 5}
// Should be skipped at height 15 (interval not met from last run at 11)
block = &common.BlockContext{Height: 15}
require.NoError(t, endBlockHook(ctx, app, block))
time.Sleep(50 * time.Millisecond)
require.Len(t, stub.runsSnapshot(), 1, "should not run - interval not met")

// Should run at height 11 (interval met)
block = &common.BlockContext{Height: 11}
// Should run at height 21 (interval met)
block = &common.BlockContext{Height: 21}
require.NoError(t, endBlockHook(ctx, app, block))
waitForRunCount(t, stub, 2)
}
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestSuccessfulRunPersistsState(t *testing.T) {
setMechanismFactoryForTest(func() Mechanism { return stub })
defer resetMechanismFactory()

store := &stubStateStore{}
store := &stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true}

svc := &common.Service{
Logger: log.New(),
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestMaybeRunRecordsErrorOnce(t *testing.T) {
}

ext := GetExtension()
ext.setStateStore(&stubStateStore{})
ext.setStateStore(&stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true})

app := &common.App{Service: svc}
require.NoError(t, engineReadyHook(ctx, app))
Expand All @@ -507,7 +507,7 @@ func TestMaybeRunRecordsErrorOnce(t *testing.T) {
ext.metrics = metricsStub
ext.mu.Unlock()

require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 1}))
require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 2}))
waitForCondition(t, time.Second, func() bool { return metricsStub.snapshot().errorCount == 1 })
errorSnapshot := metricsStub.snapshot()
require.Equal(t, 1, errorSnapshot.startCount)
Expand All @@ -525,7 +525,7 @@ func TestFailedRunStillAdvancesState(t *testing.T) {
setMechanismFactoryForTest(func() Mechanism { return &errorRunMechanism{} })
defer resetMechanismFactory()

store := &stubStateStore{}
store := &stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true}

svc := &common.Service{
Logger: log.New(),
Expand Down Expand Up @@ -572,6 +572,74 @@ func TestFailedRunStillAdvancesState(t *testing.T) {
require.Equal(t, int64(5), metricsStub.snapshot().lastHeight)
}

func TestFirstBlockSeedsSchedule(t *testing.T) {
ctx := context.Background()
ResetForTest()

stub := &stubMechanism{}
setMechanismFactoryForTest(func() Mechanism { return stub })
defer resetMechanismFactory()

store := &stubStateStore{}

svc := &common.Service{
Logger: log.New(),
LocalConfig: &config.Config{
DB: config.DBConfig{DBName: "kwild_test"},
Extensions: map[string]map[string]string{
ExtensionName: {
ConfigKeyEnabled: "true",
ConfigKeyBlockInterval: "10",
},
},
},
}

app := &common.App{Service: svc}

ext := GetExtension()
ext.setLogger(log.New())
ext.setStateStore(store)

now := time.Unix(500, 0)
ext.setNowFunc(func() time.Time { return now })

require.NoError(t, engineReadyHook(ctx, app))

metricsStub := &stubMetricsRecorder{}
ext.mu.Lock()
ext.metrics = metricsStub
ext.mu.Unlock()

// First end-block on a fresh node seeds the schedule, does not run.
require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 100}))
waitForCondition(t, time.Second, func() bool {
return metricsStub.snapshot().lastSkipReason == "first_block_seed"
})
require.Equal(t, 0, stub.runCount(), "first block must not run pg_repack")
require.Equal(t, 1, store.saveCountValue(), "seed must persist")
require.Equal(t, int64(100), store.lastSavedState().LastRunHeight)
require.Equal(t, now.UTC(), store.lastSavedState().LastRunAt)

ext.mu.RLock()
require.Equal(t, int64(100), ext.state.LastRunHeight)
ext.mu.RUnlock()

// Subsequent block within block_interval is skipped via the normal gate.
require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 105}))
waitForCondition(t, time.Second, func() bool {
return metricsStub.snapshot().lastSkipReason == "block_interval_not_met"
})
require.Equal(t, 0, stub.runCount())

// Block at seed + block_interval fires the first real run.
require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 110}))
waitForRunCount(t, stub, 1)
firstRun, ok := stub.runAt(0)
require.True(t, ok)
require.Equal(t, "block_interval:110", firstRun.Reason)
}

func TestEnqueueRunBusy(t *testing.T) {
ctx := context.Background()
ResetForTest()
Expand Down Expand Up @@ -738,14 +806,16 @@ func TestVacuumSkipsDuringCatchup(t *testing.T) {
ext.setStateStore(&stubStateStore{loadOK: false})

// Manually set up the extension without calling configure
// to avoid pg_repack dependency
// to avoid pg_repack dependency. Pre-seed state so the first non-sync
// block triggers a run instead of just anchoring the schedule.
ext.mu.Lock()
ext.config = Config{
Enabled: true,
BlockInterval: 100,
}
ext.mechanism = stub
ext.runner = &Runner{logger: svc.Logger}
ext.state = runState{LastRunHeight: 1}
ext.mu.Unlock()

metricsStub := &stubMetricsRecorder{}
Expand Down Expand Up @@ -831,14 +901,16 @@ func TestVacuumResumesAfterCatchup(t *testing.T) {
ext.setStateStore(&stubStateStore{loadOK: false})

// Manually set up the extension without calling configure
// to avoid pg_repack dependency
// to avoid pg_repack dependency. Pre-seed state so the first post-sync
// block triggers a run instead of just anchoring the schedule.
ext.mu.Lock()
ext.config = Config{
Enabled: true,
BlockInterval: 50,
}
ext.mechanism = stub
ext.runner = &Runner{logger: svc.Logger}
ext.state = runState{LastRunHeight: 1}
ext.mu.Unlock()

app := &common.App{
Expand Down
Loading