diff --git a/extensions/tn_vacuum/extension.go b/extensions/tn_vacuum/extension.go index 9671f38d..1dca0b9c 100644 --- a/extensions/tn_vacuum/extension.go +++ b/extensions/tn_vacuum/extension.go @@ -332,22 +332,42 @@ func (e *Extension) maybeRun(ctx context.Context, blockHeight int64) { svc := e.service metricsRecorder := e.metrics nowFn := e.now + store := e.stateStore e.mu.RUnlock() if !cfg.Enabled || mech == nil || runner == nil { return } - if state.LastRunHeight != 0 { - if blockHeight <= state.LastRunHeight { - return + if state.LastRunHeight == 0 { + seeded := runState{LastRunHeight: blockHeight} + if nowFn != nil { + seeded.LastRunAt = nowFn().UTC() } - if blockHeight-state.LastRunHeight < cfg.BlockInterval { - if metricsRecorder != nil { - metricsRecorder.RecordVacuumSkipped(ctx, "block_interval_not_met") + if store != nil { + if saveErr := store.Save(ctx, seeded); saveErr != nil { + logger.Warn("failed to persist tn_vacuum seed state", "error", saveErr) } - return } + e.mu.Lock() + if blockHeight > e.state.LastRunHeight { + e.state = seeded + } + e.mu.Unlock() + if metricsRecorder != nil { + metricsRecorder.RecordVacuumSkipped(ctx, "first_block_seed") + } + return + } + + if blockHeight <= state.LastRunHeight { + return + } + if blockHeight-state.LastRunHeight < cfg.BlockInterval { + if metricsRecorder != nil { + metricsRecorder.RecordVacuumSkipped(ctx, "block_interval_not_met") + } + return } reason := fmt.Sprintf("block_interval:%d", blockHeight) diff --git a/extensions/tn_vacuum/vacuum_test.go b/extensions/tn_vacuum/vacuum_test.go index 16c71427..173c9a25 100644 --- a/extensions/tn_vacuum/vacuum_test.go +++ b/extensions/tn_vacuum/vacuum_test.go @@ -217,25 +217,25 @@ func TestEngineReadyPreparesMechanism(t *testing.T) { } ext := GetExtension() - ext.setStateStore(&stubStateStore{}) + ext.setStateStore(&stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true}) app := &common.App{Service: svc} require.NoError(t, engineReadyHook(ctx, app)) require.Equal(t, 1, stub.preparedCount()) - block := &common.BlockContext{Height: 1} + block := &common.BlockContext{Height: 4} require.NoError(t, endBlockHook(ctx, app, block)) waitForRunCount(t, stub, 1) firstRun, ok := stub.runAt(0) require.True(t, ok) - require.Equal(t, "block_interval:1", firstRun.Reason) + require.Equal(t, "block_interval:4", firstRun.Reason) require.Equal(t, 2, firstRun.PgRepackJobs) - require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 2})) + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 5})) time.Sleep(50 * time.Millisecond) require.Len(t, stub.runsSnapshot(), 1) - require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 4})) + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 7})) waitForRunCount(t, stub, 2) } @@ -279,12 +279,12 @@ func TestRunReportEnhancement(t *testing.T) { } ext := GetExtension() - ext.setStateStore(&stubStateStore{}) + ext.setStateStore(&stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true}) app := &common.App{Service: svc} require.NoError(t, engineReadyHook(ctx, app)) - block := &common.BlockContext{Height: 1} + block := &common.BlockContext{Height: 2} require.NoError(t, endBlockHook(ctx, app, block)) waitForRunCount(t, stub, 1) @@ -324,24 +324,24 @@ func TestVacuumSkippedMetrics(t *testing.T) { } ext := GetExtension() - ext.setStateStore(&stubStateStore{}) + ext.setStateStore(&stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true}) app := &common.App{Service: svc} require.NoError(t, engineReadyHook(ctx, app)) - // First run at height 1 - block := &common.BlockContext{Height: 1} + // Should run at height 11 (interval met from seeded height 1) + block := &common.BlockContext{Height: 11} require.NoError(t, endBlockHook(ctx, app, block)) waitForRunCount(t, stub, 1) - // Should be skipped at height 5 (interval not met) - block = &common.BlockContext{Height: 5} + // Should be skipped at height 15 (interval not met from last run at 11) + block = &common.BlockContext{Height: 15} require.NoError(t, endBlockHook(ctx, app, block)) time.Sleep(50 * time.Millisecond) require.Len(t, stub.runsSnapshot(), 1, "should not run - interval not met") - // Should run at height 11 (interval met) - block = &common.BlockContext{Height: 11} + // Should run at height 21 (interval met) + block = &common.BlockContext{Height: 21} require.NoError(t, endBlockHook(ctx, app, block)) waitForRunCount(t, stub, 2) } @@ -409,7 +409,7 @@ func TestSuccessfulRunPersistsState(t *testing.T) { setMechanismFactoryForTest(func() Mechanism { return stub }) defer resetMechanismFactory() - store := &stubStateStore{} + store := &stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true} svc := &common.Service{ Logger: log.New(), @@ -497,7 +497,7 @@ func TestMaybeRunRecordsErrorOnce(t *testing.T) { } ext := GetExtension() - ext.setStateStore(&stubStateStore{}) + ext.setStateStore(&stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true}) app := &common.App{Service: svc} require.NoError(t, engineReadyHook(ctx, app)) @@ -507,7 +507,7 @@ func TestMaybeRunRecordsErrorOnce(t *testing.T) { ext.metrics = metricsStub ext.mu.Unlock() - require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 1})) + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 2})) waitForCondition(t, time.Second, func() bool { return metricsStub.snapshot().errorCount == 1 }) errorSnapshot := metricsStub.snapshot() require.Equal(t, 1, errorSnapshot.startCount) @@ -525,7 +525,7 @@ func TestFailedRunStillAdvancesState(t *testing.T) { setMechanismFactoryForTest(func() Mechanism { return &errorRunMechanism{} }) defer resetMechanismFactory() - store := &stubStateStore{} + store := &stubStateStore{loadState: runState{LastRunHeight: 1}, loadOK: true} svc := &common.Service{ Logger: log.New(), @@ -572,6 +572,74 @@ func TestFailedRunStillAdvancesState(t *testing.T) { require.Equal(t, int64(5), metricsStub.snapshot().lastHeight) } +func TestFirstBlockSeedsSchedule(t *testing.T) { + ctx := context.Background() + ResetForTest() + + stub := &stubMechanism{} + setMechanismFactoryForTest(func() Mechanism { return stub }) + defer resetMechanismFactory() + + store := &stubStateStore{} + + svc := &common.Service{ + Logger: log.New(), + LocalConfig: &config.Config{ + DB: config.DBConfig{DBName: "kwild_test"}, + Extensions: map[string]map[string]string{ + ExtensionName: { + ConfigKeyEnabled: "true", + ConfigKeyBlockInterval: "10", + }, + }, + }, + } + + app := &common.App{Service: svc} + + ext := GetExtension() + ext.setLogger(log.New()) + ext.setStateStore(store) + + now := time.Unix(500, 0) + ext.setNowFunc(func() time.Time { return now }) + + require.NoError(t, engineReadyHook(ctx, app)) + + metricsStub := &stubMetricsRecorder{} + ext.mu.Lock() + ext.metrics = metricsStub + ext.mu.Unlock() + + // First end-block on a fresh node seeds the schedule, does not run. + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 100})) + waitForCondition(t, time.Second, func() bool { + return metricsStub.snapshot().lastSkipReason == "first_block_seed" + }) + require.Equal(t, 0, stub.runCount(), "first block must not run pg_repack") + require.Equal(t, 1, store.saveCountValue(), "seed must persist") + require.Equal(t, int64(100), store.lastSavedState().LastRunHeight) + require.Equal(t, now.UTC(), store.lastSavedState().LastRunAt) + + ext.mu.RLock() + require.Equal(t, int64(100), ext.state.LastRunHeight) + ext.mu.RUnlock() + + // Subsequent block within block_interval is skipped via the normal gate. + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 105})) + waitForCondition(t, time.Second, func() bool { + return metricsStub.snapshot().lastSkipReason == "block_interval_not_met" + }) + require.Equal(t, 0, stub.runCount()) + + // Block at seed + block_interval fires the first real run. + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 110})) + waitForRunCount(t, stub, 1) + firstRun, ok := stub.runAt(0) + require.True(t, ok) + require.Equal(t, "block_interval:110", firstRun.Reason) +} + func TestEnqueueRunBusy(t *testing.T) { ctx := context.Background() ResetForTest() @@ -738,7 +806,8 @@ func TestVacuumSkipsDuringCatchup(t *testing.T) { ext.setStateStore(&stubStateStore{loadOK: false}) // Manually set up the extension without calling configure - // to avoid pg_repack dependency + // to avoid pg_repack dependency. Pre-seed state so the first non-sync + // block triggers a run instead of just anchoring the schedule. ext.mu.Lock() ext.config = Config{ Enabled: true, @@ -746,6 +815,7 @@ func TestVacuumSkipsDuringCatchup(t *testing.T) { } ext.mechanism = stub ext.runner = &Runner{logger: svc.Logger} + ext.state = runState{LastRunHeight: 1} ext.mu.Unlock() metricsStub := &stubMetricsRecorder{} @@ -831,7 +901,8 @@ func TestVacuumResumesAfterCatchup(t *testing.T) { ext.setStateStore(&stubStateStore{loadOK: false}) // Manually set up the extension without calling configure - // to avoid pg_repack dependency + // to avoid pg_repack dependency. Pre-seed state so the first post-sync + // block triggers a run instead of just anchoring the schedule. ext.mu.Lock() ext.config = Config{ Enabled: true, @@ -839,6 +910,7 @@ func TestVacuumResumesAfterCatchup(t *testing.T) { } ext.mechanism = stub ext.runner = &Runner{logger: svc.Logger} + ext.state = runState{LastRunHeight: 1} ext.mu.Unlock() app := &common.App{