diff --git a/extensions/tn_vacuum/extension.go b/extensions/tn_vacuum/extension.go index 7faa4aaf..9671f38d 100644 --- a/extensions/tn_vacuum/extension.go +++ b/extensions/tn_vacuum/extension.go @@ -235,22 +235,19 @@ func (e *Extension) processRun(ctx context.Context, req runRequest) { PgRepackJobs: req.PgRepackJobs, }) - if err != nil { - logger.Warn("vacuum run failed", "error", err, "height", req.height, "reason", req.reason) - e.mu.Lock() - e.runInProgress = false - e.mu.Unlock() - return - } - + // Advance scheduling state regardless of run outcome. A transient mechanism + // error (pg_repack version mismatch, lock-conflict timeout, missing binary, + // etc.) must not keep maybeRun queuing a fresh attempt on every committed + // block — the success/failure distinction lives in the metric, not the + // schedule. newState := runState{LastRunHeight: req.height} if nowFn != nil { newState.LastRunAt = nowFn().UTC() } if store != nil { - if err := store.Save(runCtx, newState); err != nil { - logger.Warn("failed to persist tn_vacuum state", "error", err) + if saveErr := store.Save(runCtx, newState); saveErr != nil { + logger.Warn("failed to persist tn_vacuum state", "error", saveErr) } } if metricsRecorder != nil { @@ -263,6 +260,10 @@ func (e *Extension) processRun(ctx context.Context, req runRequest) { } e.runInProgress = false e.mu.Unlock() + + if err != nil { + logger.Warn("vacuum run failed", "error", err, "height", req.height, "reason", req.reason) + } } // enqueueRun places a run request on the worker queue if no job is already diff --git a/extensions/tn_vacuum/vacuum_test.go b/extensions/tn_vacuum/vacuum_test.go index 3f466ccf..16c71427 100644 --- a/extensions/tn_vacuum/vacuum_test.go +++ b/extensions/tn_vacuum/vacuum_test.go @@ -514,6 +514,64 @@ func TestMaybeRunRecordsErrorOnce(t *testing.T) { require.Equal(t, "error_run", errorSnapshot.lastErrorMechanism) } +// TestFailedRunStillAdvancesState confirms that a mechanism error does not +// pin the schedule to the last successful height. Without this, maybeRun +// would observe LastRunHeight stuck at 0 and enqueue a fresh attempt on every +// committed block — the regression behind INCIDENT_2026-05-13_pg_repack_lockup. +func TestFailedRunStillAdvancesState(t *testing.T) { + ctx := context.Background() + ResetForTest() + + setMechanismFactoryForTest(func() Mechanism { return &errorRunMechanism{} }) + defer resetMechanismFactory() + + store := &stubStateStore{} + + svc := &common.Service{ + Logger: log.New(), + LocalConfig: &config.Config{ + DB: config.DBConfig{DBName: "kwild_test"}, + Extensions: map[string]map[string]string{ + ExtensionName: { + ConfigKeyEnabled: "true", + ConfigKeyBlockInterval: "1", + }, + }, + }, + } + + app := &common.App{Service: svc} + + ext := GetExtension() + ext.setLogger(log.New()) + ext.setStateStore(store) + + now := time.Unix(200, 0) + ext.setNowFunc(func() time.Time { return now }) + + require.NoError(t, engineReadyHook(ctx, app)) + + metricsStub := &stubMetricsRecorder{} + ext.mu.Lock() + ext.metrics = metricsStub + ext.mu.Unlock() + + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 5})) + + waitForCondition(t, time.Second, func() bool { return metricsStub.snapshot().errorCount == 1 }) + waitForCondition(t, time.Second, func() bool { return store.saveCountValue() == 1 }) + + savedState := store.lastSavedState() + require.Equal(t, int64(5), savedState.LastRunHeight) + require.Equal(t, now.UTC(), savedState.LastRunAt) + + ext.mu.RLock() + require.Equal(t, int64(5), ext.state.LastRunHeight) + ext.mu.RUnlock() + + require.Equal(t, int64(5), metricsStub.snapshot().lastHeight) +} + func TestEnqueueRunBusy(t *testing.T) { ctx := context.Background() ResetForTest()