Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions extensions/tn_vacuum/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,22 +235,19 @@ func (e *Extension) processRun(ctx context.Context, req runRequest) {
PgRepackJobs: req.PgRepackJobs,
})

if err != nil {
logger.Warn("vacuum run failed", "error", err, "height", req.height, "reason", req.reason)
e.mu.Lock()
e.runInProgress = false
e.mu.Unlock()
return
}

// Advance scheduling state regardless of run outcome. A transient mechanism
// error (pg_repack version mismatch, lock-conflict timeout, missing binary,
// etc.) must not keep maybeRun queuing a fresh attempt on every committed
// block — the success/failure distinction lives in the metric, not the
// schedule.
newState := runState{LastRunHeight: req.height}
if nowFn != nil {
newState.LastRunAt = nowFn().UTC()
}

if store != nil {
if err := store.Save(runCtx, newState); err != nil {
logger.Warn("failed to persist tn_vacuum state", "error", err)
if saveErr := store.Save(runCtx, newState); saveErr != nil {
logger.Warn("failed to persist tn_vacuum state", "error", saveErr)
}
}
if metricsRecorder != nil {
Expand All @@ -263,6 +260,10 @@ func (e *Extension) processRun(ctx context.Context, req runRequest) {
}
e.runInProgress = false
e.mu.Unlock()

if err != nil {
logger.Warn("vacuum run failed", "error", err, "height", req.height, "reason", req.reason)
}
}

// enqueueRun places a run request on the worker queue if no job is already
Expand Down
58 changes: 58 additions & 0 deletions extensions/tn_vacuum/vacuum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,64 @@ func TestMaybeRunRecordsErrorOnce(t *testing.T) {
require.Equal(t, "error_run", errorSnapshot.lastErrorMechanism)
}

// TestFailedRunStillAdvancesState confirms that a mechanism error does not
// pin the schedule to the last successful height. Without this, maybeRun
// would observe LastRunHeight stuck at 0 and enqueue a fresh attempt on every
// committed block — the regression behind INCIDENT_2026-05-13_pg_repack_lockup.
func TestFailedRunStillAdvancesState(t *testing.T) {
ctx := context.Background()
ResetForTest()

setMechanismFactoryForTest(func() Mechanism { return &errorRunMechanism{} })
defer resetMechanismFactory()

store := &stubStateStore{}

svc := &common.Service{
Logger: log.New(),
LocalConfig: &config.Config{
DB: config.DBConfig{DBName: "kwild_test"},
Extensions: map[string]map[string]string{
ExtensionName: {
ConfigKeyEnabled: "true",
ConfigKeyBlockInterval: "1",
},
},
},
}

app := &common.App{Service: svc}

ext := GetExtension()
ext.setLogger(log.New())
ext.setStateStore(store)

now := time.Unix(200, 0)
ext.setNowFunc(func() time.Time { return now })

require.NoError(t, engineReadyHook(ctx, app))

metricsStub := &stubMetricsRecorder{}
ext.mu.Lock()
ext.metrics = metricsStub
ext.mu.Unlock()

require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 5}))

waitForCondition(t, time.Second, func() bool { return metricsStub.snapshot().errorCount == 1 })
waitForCondition(t, time.Second, func() bool { return store.saveCountValue() == 1 })

savedState := store.lastSavedState()
require.Equal(t, int64(5), savedState.LastRunHeight)
require.Equal(t, now.UTC(), savedState.LastRunAt)

ext.mu.RLock()
require.Equal(t, int64(5), ext.state.LastRunHeight)
ext.mu.RUnlock()

require.Equal(t, int64(5), metricsStub.snapshot().lastHeight)
}

func TestEnqueueRunBusy(t *testing.T) {
ctx := context.Background()
ResetForTest()
Expand Down
Loading