diff --git a/container/cmd/ghfe/staging_proxy_test.go b/container/cmd/ghfe/staging_proxy_test.go index 04d0203..aebf106 100644 --- a/container/cmd/ghfe/staging_proxy_test.go +++ b/container/cmd/ghfe/staging_proxy_test.go @@ -52,7 +52,7 @@ func stagingPayload() []byte { "installation": map[string]any{"id": float64(1)}, "repository": map[string]any{ "id": float64(2), - "full_name": "riseproject-dev/riscv-runner-sample", + "full_name": "riseproject-dev/riscv-runner-sample-staging", "owner": map[string]any{"id": float64(internal.RiseprojectDevOrgID), "type": "Organization", "login": "riseproject-dev"}, }, "workflow_job": map[string]any{ @@ -172,9 +172,9 @@ func TestShouldProxyToStaging_Negatives(t *testing.T) { ent internal.Entity repo string }{ - {"staging instance", internal.Config{Prod: false, StagingURL: "https://s"}, riseEntity, "riseproject-dev/riscv-runner-sample"}, - {"no staging url", internal.Config{Prod: true}, riseEntity, "riseproject-dev/riscv-runner-sample"}, - {"entity not in config", prod, unknownEntity, "riseproject-dev/riscv-runner-sample"}, + {"staging instance", internal.Config{Prod: false, StagingURL: "https://s"}, riseEntity, "riseproject-dev/riscv-runner-sample-staging"}, + {"no staging url", internal.Config{Prod: true}, riseEntity, "riseproject-dev/riscv-runner-sample-staging"}, + {"entity not in config", prod, unknownEntity, "riseproject-dev/riscv-runner-sample-staging"}, {"repo not in entity staging list", prod, riseEntity, "riseproject-dev/something-else"}, } for _, tc := range cases { @@ -183,7 +183,7 @@ func TestShouldProxyToStaging_Negatives(t *testing.T) { } } - if !shouldProxyToStaging(prod, riseEntity, "riseproject-dev/riscv-runner-sample") { + if !shouldProxyToStaging(prod, riseEntity, "riseproject-dev/riscv-runner-sample-staging") { t.Error("positive case: prod + sample repo should proxy") } } diff --git a/container/cmd/scheduler/demand_match.go b/container/cmd/scheduler/demand_match.go index 93e8e16..b04fd52 100644 --- a/container/cmd/scheduler/demand_match.go +++ b/container/cmd/scheduler/demand_match.go @@ -113,7 +113,7 @@ func (a *App) tryProvision(ctx context.Context, j internal.Job) bool { if err := a.provisionRunner(ctx, j, runnerName, labels); err != nil { slog.Error("Failed to provision runner", "entity", e, "runner_name", runnerName, "k8s_pool", j.K8sPool, "err", err) - info := internal.FailureInfo{Version: 2, Reason: internal.ReasonPodAllocationFailure} + info := internal.FailureInfoV2{Reason: internal.ReasonPodAllocationFailure} _ = a.DB.MarkWorkerFailed(ctx, runnerName, "", info, nil) // Row was created, slot is consumed. return true diff --git a/container/cmd/scheduler/demand_match_test.go b/container/cmd/scheduler/demand_match_test.go index 0b37223..2eca921 100644 --- a/container/cmd/scheduler/demand_match_test.go +++ b/container/cmd/scheduler/demand_match_test.go @@ -94,8 +94,8 @@ func TestProvisionRunner_FailureMarksWorker(t *testing.T) { if len(db.MarkFailed) != 1 { t.Fatalf("expected 1 mark_worker_failed call, got %v", db.MarkFailed) } - if db.MarkFailed[0].Info.Reason != internal.ReasonPodAllocationFailure { - t.Errorf("failure_info.reason=%q want pod_allocation_failure", db.MarkFailed[0].Info.Reason) + if db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonPodAllocationFailure { + t.Errorf("failure_info.reason=%q want pod_allocation_failure", db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason) } } diff --git a/container/cmd/scheduler/sync_jobs.go b/container/cmd/scheduler/sync_jobs.go index 16e36d7..d7b404f 100644 --- a/container/cmd/scheduler/sync_jobs.go +++ b/container/cmd/scheduler/sync_jobs.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "time" "github.com/riseproject-dev/riscv-runner-app/container/internal" ) @@ -42,10 +43,9 @@ func (a *App) syncOneJob(ctx context.Context, j internal.Job) error { if errors.As(err, &apiErr) && apiErr.StatusCode == 404 { slog.Warn("Installation not found, marking job failed", "entity", e, "installation_id", j.InstallationID, "job_id", j.JobID) - _, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfo{ - Version: 1, - Reason: internal.FailureReason(fmt.Sprintf("installation not found for installation_id=%d entity_type=%s", - j.InstallationID, e.Type)), + _, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfoV1{ + Message: fmt.Sprintf("installation not found for installation_id=%d entity_type=%s", + j.InstallationID, e.Type), }) return nil } @@ -59,10 +59,9 @@ func (a *App) syncOneJob(ctx context.Context, j internal.Job) error { var apiErr *internal.GitHubAPIError if errors.As(err, &apiErr) && apiErr.StatusCode == 404 { slog.Warn("Job not found, marking as failed", "entity", e, "job_id", j.JobID) - _, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfo{ - Version: 1, - Reason: internal.FailureReason(fmt.Sprintf("job not found for job_id=%d entity=%s entity_id=%d entity_type=%s", - j.JobID, e.Name, e.ID, e.Type)), + _, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfoV1{ + Message: fmt.Sprintf("job not found for job_id=%d entity=%s entity_id=%d entity_type=%s", + j.JobID, e.Name, e.ID, e.Type), }) return nil } @@ -86,6 +85,39 @@ func (a *App) syncOneJob(ctx context.Context, j internal.Job) error { "entity", e, "job_id", j.JobID) _, _ = a.DB.MarkJobRunning(ctx, j.JobID, ghJob.RunnerName) } + case "queued": + a.reconcileStuckQueued(ctx, j, ghJob, token) } return nil } + +// reconcileStuckQueued detects jobs that GitHub still reports as queued +// even though their parent workflow run has already terminated. This +// happens when a run is cancelled (or a sibling fails fast-fail-style) +// before scheduling reaches the job; the job then sits queued forever +// and the scheduler would otherwise keep trying to provision a runner +// for a job that will never start. We mark the row failed with the +// run's conclusion so the worker slot frees up. +func (a *App) reconcileStuckQueued(ctx context.Context, j internal.Job, ghJob internal.GHJob, token string) { + if ghJob.RunID == 0 || time.Since(j.CreatedAt) < internal.JobStuckQueuedMinAge { + return + } + run, err := a.GH.GetRunInfo(ctx, token, j.RepoFullName, ghJob.RunID) + if err != nil { + slog.Debug("GetRunInfo failed", "entity", j.Entity(), "job_id", j.JobID, "run_id", ghJob.RunID, "err", err) + return + } + if run.Status != "completed" { + return + } + conclusion := "unknown" + if run.Conclusion != nil && *run.Conclusion != "" { + conclusion = *run.Conclusion + } + slog.Warn("GH reconcile: job stuck queued while run is completed; marking failed", + "entity", j.Entity(), "job_id", j.JobID, "run_id", ghJob.RunID, "run_conclusion", conclusion) + _, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfoV1{ + Message: fmt.Sprintf("workflow run %d completed (conclusion=%s) while job %d stayed queued", + ghJob.RunID, conclusion, j.JobID), + }) +} diff --git a/container/cmd/scheduler/sync_jobs_test.go b/container/cmd/scheduler/sync_jobs_test.go index c17efe1..087314d 100644 --- a/container/cmd/scheduler/sync_jobs_test.go +++ b/container/cmd/scheduler/sync_jobs_test.go @@ -2,7 +2,9 @@ package main import ( "context" + "strings" "testing" + "time" "github.com/riseproject-dev/riscv-runner-app/container/internal" ) @@ -164,6 +166,101 @@ func TestSyncOneJob_InProgressFromPendingPromotesToRunning(t *testing.T) { } } +// TestSyncOneJob_StuckQueuedMarksFailedWhenRunCompleted covers the case +// where GitHub reports a job still queued even though the parent workflow +// run has terminated. The scheduler should detect this and mark the job +// failed so the slot frees up. +func TestSyncOneJob_StuckQueuedMarksFailedWhenRunCompleted(t *testing.T) { + app, db, gh, _ := schedTestApp() + gh.OnAuthenticateApp = func(int64, int64) (string, error) { return "tok", nil } + gh.OnGetJobInfo = func(string, string, int64) (internal.GHJob, error) { + return internal.GHJob{Status: "queued", RunID: 4242}, nil + } + conc := "failure" + gh.OnGetRunInfo = func(_, _ string, runID int64) (internal.GHRun, error) { + if runID != 4242 { + t.Errorf("GetRunInfo called with runID=%d, want 4242", runID) + } + return internal.GHRun{Status: "completed", Conclusion: &conc}, nil + } + var markedMessage string + db.OnMarkJobFailed = func(_ int64, info internal.FailureInfo) (string, error) { + v1, ok := info.(internal.FailureInfoV1) + if !ok { + t.Fatalf("expected FailureInfoV1, got %T", info) + } + markedMessage = v1.Message + return "pending", nil + } + db.Jobs = []internal.Job{{ + JobID: 7, Status: "pending", RepoFullName: "a/r", EntityName: "a", + EntityType: "Organization", InstallationID: 9, + CreatedAt: time.Now().Add(-2 * internal.JobStuckQueuedMinAge), + }} + if err := app.syncJobsState(context.Background()); err != nil { + t.Fatal(err) + } + if markedMessage == "" { + t.Fatal("expected MarkJobFailed to be called") + } + if !strings.Contains(markedMessage, "stayed queued") || !strings.Contains(markedMessage, "failure") { + t.Errorf("message missing expected fragments: %q", markedMessage) + } +} + +// TestSyncOneJob_StuckQueuedSkippedWhenJobYoung verifies we do not burn +// GitHub API quota probing the run on freshly-queued jobs. +func TestSyncOneJob_StuckQueuedSkippedWhenJobYoung(t *testing.T) { + app, db, gh, _ := schedTestApp() + gh.OnAuthenticateApp = func(int64, int64) (string, error) { return "tok", nil } + gh.OnGetJobInfo = func(string, string, int64) (internal.GHJob, error) { + return internal.GHJob{Status: "queued", RunID: 1}, nil + } + runProbed := false + gh.OnGetRunInfo = func(string, string, int64) (internal.GHRun, error) { + runProbed = true + return internal.GHRun{}, nil + } + db.Jobs = []internal.Job{{ + JobID: 1, Status: "pending", RepoFullName: "a/r", EntityName: "a", + EntityType: "Organization", InstallationID: 9, + CreatedAt: time.Now(), // brand new + }} + if err := app.syncJobsState(context.Background()); err != nil { + t.Fatal(err) + } + if runProbed { + t.Errorf("GetRunInfo should not be called for jobs younger than JobStuckQueuedMinAge") + } + if len(db.MarkFailed) != 0 { + t.Errorf("young queued job must not be marked failed") + } +} + +// TestSyncOneJob_StuckQueuedRunStillRunningIsNoop covers the case where +// the run is still in flight — we probe but leave the job alone. +func TestSyncOneJob_StuckQueuedRunStillRunningIsNoop(t *testing.T) { + app, db, gh, _ := schedTestApp() + gh.OnAuthenticateApp = func(int64, int64) (string, error) { return "tok", nil } + gh.OnGetJobInfo = func(string, string, int64) (internal.GHJob, error) { + return internal.GHJob{Status: "queued", RunID: 1}, nil + } + gh.OnGetRunInfo = func(string, string, int64) (internal.GHRun, error) { + return internal.GHRun{Status: "in_progress"}, nil + } + db.Jobs = []internal.Job{{ + JobID: 1, Status: "pending", RepoFullName: "a/r", EntityName: "a", + EntityType: "Organization", InstallationID: 9, + CreatedAt: time.Now().Add(-2 * internal.JobStuckQueuedMinAge), + }} + if err := app.syncJobsState(context.Background()); err != nil { + t.Fatal(err) + } + if len(db.MarkFailed) != 0 { + t.Errorf("queued job with still-running parent run must not be marked failed") + } +} + // TestSyncOneJob_InProgressFromRunningIsNoop covers the no-op branch when DB // already has running. func TestSyncOneJob_InProgressFromRunningIsNoop(t *testing.T) { diff --git a/container/cmd/scheduler/sync_workers_test.go b/container/cmd/scheduler/sync_workers_test.go index 1d625f2..46586e5 100644 --- a/container/cmd/scheduler/sync_workers_test.go +++ b/container/cmd/scheduler/sync_workers_test.go @@ -48,8 +48,8 @@ func TestPhase3_OfflineRunnerPastTimeoutFails(t *testing.T) { if len(db.MarkFailed) != 1 { t.Fatalf("expected MarkWorkerFailed, got %v", db.MarkFailed) } - if db.MarkFailed[0].Info.Reason != internal.ReasonRunnerNeverRegistered { - t.Errorf("reason=%q want runner_never_registered", db.MarkFailed[0].Info.Reason) + if db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonRunnerNeverRegistered { + t.Errorf("reason=%q want runner_never_registered", db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason) } if len(kube.KillCalls) != 1 || kube.KillCalls[0] != w.PodName { t.Errorf("expected KillPod for %s, got %v", w.PodName, kube.KillCalls) @@ -75,7 +75,7 @@ func TestPhase3_OnlineIdleRunnerPastTimeoutFails(t *testing.T) { if err := app.syncWorkersState(context.Background()); err != nil { t.Fatalf("syncWorkersState: %v", err) } - if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.Reason != internal.ReasonRunnerIdle { + if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonRunnerIdle { t.Fatalf("expected RunnerIdle failure, got %v", db.MarkFailed) } } @@ -132,7 +132,7 @@ func TestPhase2_FailedPodMarksWorkerFailed(t *testing.T) { if err := app.syncWorkersState(context.Background()); err != nil { t.Fatal(err) } - if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.Reason != internal.ReasonPodFailed { + if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonPodFailed { t.Errorf("expected MarkWorkerFailed(pod_failed), got %v", db.MarkFailed) } } @@ -152,7 +152,7 @@ func TestPhase3_PendingPastTimeoutFailsWithStuckPending(t *testing.T) { if err := app.syncWorkersState(context.Background()); err != nil { t.Fatal(err) } - if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.Reason != internal.ReasonPodStuckPending { + if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonPodStuckPending { t.Errorf("got %v", db.MarkFailed) } } diff --git a/container/internal/constants.go b/container/internal/constants.go index 377907f..e9aca65 100644 --- a/container/internal/constants.go +++ b/container/internal/constants.go @@ -31,6 +31,7 @@ const ( RunnerPendingTimeout = 600 * time.Second // runner registered with GH but never picks up a job PodPendingTimeout = 600 * time.Second // pod stuck Pending (no capacity, image pull, etc.) PodDeleteGrace = 6 * time.Hour // keep terminal pods around so operators can still kubectl logs them + JobStuckQueuedMinAge = 10 * time.Minute // minimum job age before sync_jobs probes the parent run for a stuck-queued condition PollInterval = 15 * time.Second diff --git a/container/internal/contract.go b/container/internal/contract.go index b94b535..0dd2942 100644 --- a/container/internal/contract.go +++ b/container/internal/contract.go @@ -143,15 +143,65 @@ type InstallationEvent struct { Payload json.RawMessage `db:"payload" json:"-"` } -// FailureInfo is the v2 shape written into workers.failure_info / jobs.failure_info. -type FailureInfo struct { - Version int `json:"version"` - Reason FailureReason `json:"reason"` - PodReason string `json:"pod_reason,omitempty"` - PodMessage string `json:"pod_message,omitempty"` - Containers map[string]ContainerInfo `json:"containers,omitempty"` - Events []EventInfo `json:"events,omitempty"` - CollectError string `json:"collect_error,omitempty"` +// FailureInfo is the sealed interface implemented by FailureInfoV1 and +// FailureInfoV2. Two on-disk schemas coexist for workers.failure_info / +// jobs.failure_info, and which one is correct depends on the failure +// mode: +// +// - FailureInfoV1 (non-pod failures): {"version":1,"message":"..."} -- +// a free-form human message with no structured fields. Used by the +// scheduler when the failure isn't a k8s pod outcome (installation +// 404, job missing on GitHub, run completed with a queued job, ...). +// - FailureInfoV2 (pod failures): {"version":2, "reason":, +// "pod_reason":..., "pod_message":..., "containers":..., "events":...} +// populated from CollectPodFailureInfo. Reason is a typed enum value +// (ReasonPodFailed, ReasonPodStuckPending, ...). +// +// DB.Mark{Job,Worker}Failed accept the interface so a caller can pass +// either variant; the on-disk shape is determined by the concrete type. +// Renderers must look at "version" in the parsed JSON first and pick +// the right branch. +type FailureInfo interface { + isFailureInfo() +} + +// FailureInfoV1 is the non-pod failure shape: a free-form message and +// nothing else. Marshals as {"version":1, "message":"..."}. +type FailureInfoV1 struct { + Message string +} + +// FailureInfoV2 is the structured pod-failure shape produced by +// CollectPodFailureInfo. Marshals as {"version":2, "reason":..., ...}. +type FailureInfoV2 struct { + Reason FailureReason + PodReason string + PodMessage string + Containers map[string]ContainerInfo + Events []EventInfo + CollectError string +} + +func (FailureInfoV1) isFailureInfo() {} +func (FailureInfoV2) isFailureInfo() {} + +func (f FailureInfoV1) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Version int `json:"version"` + Message string `json:"message,omitempty"` + }{1, f.Message}) +} + +func (f FailureInfoV2) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Version int `json:"version"` + Reason FailureReason `json:"reason"` + PodReason string `json:"pod_reason,omitempty"` + PodMessage string `json:"pod_message,omitempty"` + Containers map[string]ContainerInfo `json:"containers,omitempty"` + Events []EventInfo `json:"events,omitempty"` + CollectError string `json:"collect_error,omitempty"` + }{2, f.Reason, f.PodReason, f.PodMessage, f.Containers, f.Events, f.CollectError}) } // ContainerInfo holds termination details + optional logs for one container. @@ -255,6 +305,16 @@ type GHJob struct { Status string `json:"status"` // queued, in_progress, completed Conclusion *string `json:"conclusion"` // null, success, failure, cancelled, ... RunnerName string `json:"runner_name"` + RunID int64 `json:"run_id"` +} + +// GHRun is the subset of the GitHub workflow-run response we use. +// A run can finish (status=completed, conclusion!=null) while one of +// its jobs stays stuck in status=queued forever — that's the case the +// scheduler reconciles against. +type GHRun struct { + Status string `json:"status"` + Conclusion *string `json:"conclusion"` } // Installation is the parsed shape of GET /app/installations/{id}. @@ -337,6 +397,7 @@ type GitHubClient interface { DeleteRunnerRepo(ctx context.Context, token, repoFullName string, runnerID int64) error GetJobInfo(ctx context.Context, token, repoFullName string, jobID int64) (GHJob, error) + GetRunInfo(ctx context.Context, token, repoFullName string, runID int64) (GHRun, error) } // GitHubAPIError carries the HTTP status code so callers can distinguish 404. @@ -364,7 +425,7 @@ type KubeClient interface { DeletePod(ctx context.Context, podName string) error KillPod(ctx context.Context, podName string) error AvailableSlots(ctx context.Context, pool string) (Capacity, error) - CollectPodFailureInfo(ctx context.Context, pod Pod, reason FailureReason) FailureInfo + CollectPodFailureInfo(ctx context.Context, pod Pod, reason FailureReason) FailureInfoV2 } // --- helpers shared between cmd/* --- diff --git a/container/internal/db.go b/container/internal/db.go index c762472..c1ec386 100644 --- a/container/internal/db.go +++ b/container/internal/db.go @@ -218,8 +218,8 @@ func (d *pgDB) MarkJobCompleted(ctx context.Context, jobID int64, runnerName str } func (d *pgDB) MarkJobFailed(ctx context.Context, jobID int64, info FailureInfo) (string, error) { - if info.Version == 0 { - return "", errors.New("failure_info.version must be set") + if info == nil { + return "", errors.New("failure_info must not be nil") } infoJSON, err := json.Marshal(info) if err != nil { @@ -408,8 +408,8 @@ func (d *pgDB) MarkWorkerCompleted(ctx context.Context, podName, node string, co } func (d *pgDB) MarkWorkerFailed(ctx context.Context, podName, node string, info FailureInfo, completedAt *time.Time) error { - if info.Version == 0 { - return errors.New("failure_info.version must be set") + if info == nil { + return errors.New("failure_info must not be nil") } infoJSON, err := json.Marshal(info) if err != nil { diff --git a/container/internal/db_test.go b/container/internal/db_test.go index a17d46b..2708c5a 100644 --- a/container/internal/db_test.go +++ b/container/internal/db_test.go @@ -156,11 +156,11 @@ func TestMarkJobCompleted_AcceptsRunningOrPending(t *testing.T) { } } -func TestMarkJobFailed_RequiresVersion(t *testing.T) { +func TestMarkJobFailed_RequiresNonNil(t *testing.T) { db, _ := newMockDB(t) - _, err := db.MarkJobFailed(context.Background(), 1, FailureInfo{}) + _, err := db.MarkJobFailed(context.Background(), 1, nil) if err == nil { - t.Fatal("expected error on zero version") + t.Fatal("expected error on nil FailureInfo") } } @@ -170,7 +170,7 @@ func TestMarkJobFailed_Success(t *testing.T) { mock.ExpectQuery(`UPDATE jobs SET status = 'failed'`). WithArgs(int64(1), pgxmock.AnyArg()). WillReturnRows(pgxmock.NewRows([]string{"prev_status"}).AddRow(&prev)) - prev, err := db.MarkJobFailed(context.Background(), 1, FailureInfo{Version: 2, Reason: ReasonPodFailed}) + prev, err := db.MarkJobFailed(context.Background(), 1, FailureInfoV2{Reason: ReasonPodFailed}) if err != nil || prev != "running" { t.Fatalf("prev=%q err=%v", prev, err) } @@ -186,7 +186,7 @@ func TestMarkJobFailed_NoMatchFallbackReadsCurrent(t *testing.T) { mock.ExpectQuery(`SELECT status::text FROM jobs WHERE job_id`). WithArgs(int64(1)). WillReturnRows(pgxmock.NewRows([]string{"status"}).AddRow(&cur)) - prev, err := db.MarkJobFailed(context.Background(), 1, FailureInfo{Version: 2, Reason: ReasonPodFailed}) + prev, err := db.MarkJobFailed(context.Background(), 1, FailureInfoV2{Reason: ReasonPodFailed}) if err != nil || prev != "completed" { t.Fatalf("prev=%q err=%v", prev, err) } @@ -200,7 +200,7 @@ func TestMarkJobFailed_NotFoundReturnsEmpty(t *testing.T) { mock.ExpectQuery(`SELECT status::text FROM jobs WHERE job_id`). WithArgs(int64(99)). WillReturnError(pgx.ErrNoRows) - prev, err := db.MarkJobFailed(context.Background(), 99, FailureInfo{Version: 2, Reason: ReasonPodFailed}) + prev, err := db.MarkJobFailed(context.Background(), 99, FailureInfoV2{Reason: ReasonPodFailed}) if err != nil || prev != "" { t.Fatalf("prev=%q err=%v", prev, err) } @@ -211,7 +211,7 @@ func TestMarkJobFailed_UpdateErrorPropagates(t *testing.T) { mock.ExpectQuery(`UPDATE jobs SET status = 'failed'`). WithArgs(int64(1), pgxmock.AnyArg()). WillReturnError(errors.New("boom")) - _, err := db.MarkJobFailed(context.Background(), 1, FailureInfo{Version: 2, Reason: ReasonPodFailed}) + _, err := db.MarkJobFailed(context.Background(), 1, FailureInfoV2{Reason: ReasonPodFailed}) if err == nil { t.Fatal("expected error") } @@ -344,10 +344,10 @@ func TestMarkWorkerCompleted(t *testing.T) { } } -func TestMarkWorkerFailed_RequiresVersion(t *testing.T) { +func TestMarkWorkerFailed_RequiresNonNil(t *testing.T) { db, _ := newMockDB(t) - if err := db.MarkWorkerFailed(context.Background(), "p", "n", FailureInfo{}, nil); err == nil { - t.Fatal("expected error on zero version") + if err := db.MarkWorkerFailed(context.Background(), "p", "n", nil, nil); err == nil { + t.Fatal("expected error on nil FailureInfo") } } @@ -356,7 +356,7 @@ func TestMarkWorkerFailed_Success(t *testing.T) { mock.ExpectExec(`UPDATE workers.*status = 'failed'`).WithArgs(anyN(4)...). WillReturnResult(pgxmock.NewResult("UPDATE", 1)) if err := db.MarkWorkerFailed(context.Background(), "p", "n", - FailureInfo{Version: 2, Reason: ReasonPodFailed}, nil); err != nil { + FailureInfoV2{Reason: ReasonPodFailed}, nil); err != nil { t.Fatalf("MarkWorkerFailed: %v", err) } } diff --git a/container/internal/github.go b/container/internal/github.go index 39f64a0..1ad3365 100644 --- a/container/internal/github.go +++ b/container/internal/github.go @@ -335,6 +335,21 @@ func (c *GHClient) GetJobInfo(ctx context.Context, token, repoFullName string, j return j, nil } +func (c *GHClient) GetRunInfo(ctx context.Context, token, repoFullName string, runID int64) (GHRun, error) { + body, status, err := c.doJSON(ctx, "GET", "/repos/"+repoFullName+"/actions/runs/"+i64(runID), nil, token) + if err != nil { + return GHRun{}, err + } + if status != 200 { + return GHRun{}, &GitHubAPIError{StatusCode: status, Message: fmt.Sprintf("get run %d: %s", runID, body)} + } + var r GHRun + if err := json.Unmarshal(body, &r); err != nil { + return GHRun{}, err + } + return r, nil +} + // doJSON sends a JSON-encoded request and returns body, status, err. // `auth` is the value put after "Bearer " in the Authorization header. func (c *GHClient) doJSON(ctx context.Context, method, path string, body any, auth string) ([]byte, int, error) { diff --git a/container/internal/k8s.go b/container/internal/k8s.go index 0a12893..ef26e0d 100644 --- a/container/internal/k8s.go +++ b/container/internal/k8s.go @@ -280,11 +280,10 @@ func (k *K8sClient) AvailableSlots(ctx context.Context, pool string) (Capacity, return Capacity{Total: total, Active: active, Available: total - active}, nil } -// CollectPodFailureInfo builds the v2 failure_info shape: container exit +// CollectPodFailureInfo builds the FailureInfoV2 shape: container exit // codes/messages/logs plus pod events. Best-effort — never returns an error. -func (k *K8sClient) CollectPodFailureInfo(ctx context.Context, p Pod, reason FailureReason) FailureInfo { - info := FailureInfo{ - Version: 2, // bump when the structure changes — older rows render via the v1 fallback +func (k *K8sClient) CollectPodFailureInfo(ctx context.Context, p Pod, reason FailureReason) FailureInfoV2 { + info := FailureInfoV2{ Reason: reason, Containers: map[string]ContainerInfo{}, Events: nil, diff --git a/container/internal/k8s_test.go b/container/internal/k8s_test.go index d39e1c4..f363e8a 100644 --- a/container/internal/k8s_test.go +++ b/container/internal/k8s_test.go @@ -502,8 +502,8 @@ func TestCollectPodFailureInfo_BuildsV2Shape(t *testing.T) { ) k := NewK8sClientFromInterface(cs) info := k.CollectPodFailureInfo(context.Background(), pod, ReasonPodFailed) - if info.Version != 2 || info.Reason != ReasonPodFailed { - t.Errorf("header fields wrong: %+v", info) + if info.Reason != ReasonPodFailed { + t.Errorf("reason wrong: %+v", info) } if info.PodMessage != "msg" || info.PodReason != "BadStuff" { t.Errorf("pod fields lost: %+v", info) diff --git a/container/internal/testutil/fakes.go b/container/internal/testutil/fakes.go index 0542ec1..dde9a60 100644 --- a/container/internal/testutil/fakes.go +++ b/container/internal/testutil/fakes.go @@ -308,6 +308,7 @@ type FakeGH struct { OnDeleteRunnerOrg func(string, string, int64) error OnDeleteRunnerRepo func(string, string, int64) error OnGetJobInfo func(string, string, int64) (internal.GHJob, error) + OnGetRunInfo func(string, string, int64) (internal.GHRun, error) } func (g *FakeGH) AuthenticateApp(ctx context.Context, instID, appID int64) (string, error) { @@ -380,6 +381,13 @@ func (g *FakeGH) GetJobInfo(ctx context.Context, token, repo string, jobID int64 return internal.GHJob{}, nil } +func (g *FakeGH) GetRunInfo(ctx context.Context, token, repo string, runID int64) (internal.GHRun, error) { + if g.OnGetRunInfo != nil { + return g.OnGetRunInfo(token, repo, runID) + } + return internal.GHRun{}, nil +} + // --- FakeKube --- // FakeKube satisfies internal.KubeClient with in-memory state. Pods are @@ -465,6 +473,6 @@ func (f *FakeKube) AvailableSlots(ctx context.Context, pool string) (internal.Ca return internal.Capacity{Available: f.SlotsByPool[pool]}, nil } -func (f *FakeKube) CollectPodFailureInfo(ctx context.Context, p internal.Pod, reason internal.FailureReason) internal.FailureInfo { - return internal.FailureInfo{Version: 2, Reason: reason} +func (f *FakeKube) CollectPodFailureInfo(ctx context.Context, p internal.Pod, reason internal.FailureReason) internal.FailureInfoV2 { + return internal.FailureInfoV2{Reason: reason} }