Skip to content
This repository was archived by the owner on May 19, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions container/cmd/ghfe/staging_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func stagingPayload() []byte {
"installation": map[string]any{"id": float64(1)},
"repository": map[string]any{
"id": float64(2),
"full_name": "riseproject-dev/riscv-runner-sample",
"full_name": "riseproject-dev/riscv-runner-sample-staging",
"owner": map[string]any{"id": float64(internal.RiseprojectDevOrgID), "type": "Organization", "login": "riseproject-dev"},
},
"workflow_job": map[string]any{
Expand Down Expand Up @@ -172,9 +172,9 @@ func TestShouldProxyToStaging_Negatives(t *testing.T) {
ent internal.Entity
repo string
}{
{"staging instance", internal.Config{Prod: false, StagingURL: "https://s"}, riseEntity, "riseproject-dev/riscv-runner-sample"},
{"no staging url", internal.Config{Prod: true}, riseEntity, "riseproject-dev/riscv-runner-sample"},
{"entity not in config", prod, unknownEntity, "riseproject-dev/riscv-runner-sample"},
{"staging instance", internal.Config{Prod: false, StagingURL: "https://s"}, riseEntity, "riseproject-dev/riscv-runner-sample-staging"},
{"no staging url", internal.Config{Prod: true}, riseEntity, "riseproject-dev/riscv-runner-sample-staging"},
{"entity not in config", prod, unknownEntity, "riseproject-dev/riscv-runner-sample-staging"},
{"repo not in entity staging list", prod, riseEntity, "riseproject-dev/something-else"},
}
for _, tc := range cases {
Expand All @@ -183,7 +183,7 @@ func TestShouldProxyToStaging_Negatives(t *testing.T) {
}
}

if !shouldProxyToStaging(prod, riseEntity, "riseproject-dev/riscv-runner-sample") {
if !shouldProxyToStaging(prod, riseEntity, "riseproject-dev/riscv-runner-sample-staging") {
t.Error("positive case: prod + sample repo should proxy")
}
}
2 changes: 1 addition & 1 deletion container/cmd/scheduler/demand_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (a *App) tryProvision(ctx context.Context, j internal.Job) bool {
if err := a.provisionRunner(ctx, j, runnerName, labels); err != nil {
slog.Error("Failed to provision runner",
"entity", e, "runner_name", runnerName, "k8s_pool", j.K8sPool, "err", err)
info := internal.FailureInfo{Version: 2, Reason: internal.ReasonPodAllocationFailure}
info := internal.FailureInfoV2{Reason: internal.ReasonPodAllocationFailure}
_ = a.DB.MarkWorkerFailed(ctx, runnerName, "", info, nil)
// Row was created, slot is consumed.
return true
Expand Down
4 changes: 2 additions & 2 deletions container/cmd/scheduler/demand_match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func TestProvisionRunner_FailureMarksWorker(t *testing.T) {
if len(db.MarkFailed) != 1 {
t.Fatalf("expected 1 mark_worker_failed call, got %v", db.MarkFailed)
}
if db.MarkFailed[0].Info.Reason != internal.ReasonPodAllocationFailure {
t.Errorf("failure_info.reason=%q want pod_allocation_failure", db.MarkFailed[0].Info.Reason)
if db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonPodAllocationFailure {
t.Errorf("failure_info.reason=%q want pod_allocation_failure", db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason)
}
}

Expand Down
48 changes: 40 additions & 8 deletions container/cmd/scheduler/sync_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"time"

"github.com/riseproject-dev/riscv-runner-app/container/internal"
)
Expand Down Expand Up @@ -42,10 +43,9 @@ func (a *App) syncOneJob(ctx context.Context, j internal.Job) error {
if errors.As(err, &apiErr) && apiErr.StatusCode == 404 {
slog.Warn("Installation not found, marking job failed",
"entity", e, "installation_id", j.InstallationID, "job_id", j.JobID)
_, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfo{
Version: 1,
Reason: internal.FailureReason(fmt.Sprintf("installation not found for installation_id=%d entity_type=%s",
j.InstallationID, e.Type)),
_, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfoV1{
Message: fmt.Sprintf("installation not found for installation_id=%d entity_type=%s",
j.InstallationID, e.Type),
})
return nil
}
Expand All @@ -59,10 +59,9 @@ func (a *App) syncOneJob(ctx context.Context, j internal.Job) error {
var apiErr *internal.GitHubAPIError
if errors.As(err, &apiErr) && apiErr.StatusCode == 404 {
slog.Warn("Job not found, marking as failed", "entity", e, "job_id", j.JobID)
_, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfo{
Version: 1,
Reason: internal.FailureReason(fmt.Sprintf("job not found for job_id=%d entity=%s entity_id=%d entity_type=%s",
j.JobID, e.Name, e.ID, e.Type)),
_, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfoV1{
Message: fmt.Sprintf("job not found for job_id=%d entity=%s entity_id=%d entity_type=%s",
j.JobID, e.Name, e.ID, e.Type),
})
return nil
}
Expand All @@ -86,6 +85,39 @@ func (a *App) syncOneJob(ctx context.Context, j internal.Job) error {
"entity", e, "job_id", j.JobID)
_, _ = a.DB.MarkJobRunning(ctx, j.JobID, ghJob.RunnerName)
}
case "queued":
a.reconcileStuckQueued(ctx, j, ghJob, token)
}
return nil
}

// reconcileStuckQueued detects jobs that GitHub still reports as queued
// even though their parent workflow run has already terminated. This
// happens when a run is cancelled (or a sibling fails fast-fail-style)
// before scheduling reaches the job; the job then sits queued forever
// and the scheduler would otherwise keep trying to provision a runner
// for a job that will never start. We mark the row failed with the
// run's conclusion so the worker slot frees up.
func (a *App) reconcileStuckQueued(ctx context.Context, j internal.Job, ghJob internal.GHJob, token string) {
if ghJob.RunID == 0 || time.Since(j.CreatedAt) < internal.JobStuckQueuedMinAge {
return
}
run, err := a.GH.GetRunInfo(ctx, token, j.RepoFullName, ghJob.RunID)
if err != nil {
slog.Debug("GetRunInfo failed", "entity", j.Entity(), "job_id", j.JobID, "run_id", ghJob.RunID, "err", err)
return
}
if run.Status != "completed" {
return
}
conclusion := "unknown"
if run.Conclusion != nil && *run.Conclusion != "" {
conclusion = *run.Conclusion
}
slog.Warn("GH reconcile: job stuck queued while run is completed; marking failed",
"entity", j.Entity(), "job_id", j.JobID, "run_id", ghJob.RunID, "run_conclusion", conclusion)
_, _ = a.DB.MarkJobFailed(ctx, j.JobID, internal.FailureInfoV1{
Message: fmt.Sprintf("workflow run %d completed (conclusion=%s) while job %d stayed queued",
ghJob.RunID, conclusion, j.JobID),
})
}
97 changes: 97 additions & 0 deletions container/cmd/scheduler/sync_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"strings"
"testing"
"time"

"github.com/riseproject-dev/riscv-runner-app/container/internal"
)
Expand Down Expand Up @@ -164,6 +166,101 @@ func TestSyncOneJob_InProgressFromPendingPromotesToRunning(t *testing.T) {
}
}

// TestSyncOneJob_StuckQueuedMarksFailedWhenRunCompleted covers the case
// where GitHub reports a job still queued even though the parent workflow
// run has terminated. The scheduler should detect this and mark the job
// failed so the slot frees up.
func TestSyncOneJob_StuckQueuedMarksFailedWhenRunCompleted(t *testing.T) {
app, db, gh, _ := schedTestApp()
gh.OnAuthenticateApp = func(int64, int64) (string, error) { return "tok", nil }
gh.OnGetJobInfo = func(string, string, int64) (internal.GHJob, error) {
return internal.GHJob{Status: "queued", RunID: 4242}, nil
}
conc := "failure"
gh.OnGetRunInfo = func(_, _ string, runID int64) (internal.GHRun, error) {
if runID != 4242 {
t.Errorf("GetRunInfo called with runID=%d, want 4242", runID)
}
return internal.GHRun{Status: "completed", Conclusion: &conc}, nil
}
var markedMessage string
db.OnMarkJobFailed = func(_ int64, info internal.FailureInfo) (string, error) {
v1, ok := info.(internal.FailureInfoV1)
if !ok {
t.Fatalf("expected FailureInfoV1, got %T", info)
}
markedMessage = v1.Message
return "pending", nil
}
db.Jobs = []internal.Job{{
JobID: 7, Status: "pending", RepoFullName: "a/r", EntityName: "a",
EntityType: "Organization", InstallationID: 9,
CreatedAt: time.Now().Add(-2 * internal.JobStuckQueuedMinAge),
}}
if err := app.syncJobsState(context.Background()); err != nil {
t.Fatal(err)
}
if markedMessage == "" {
t.Fatal("expected MarkJobFailed to be called")
}
if !strings.Contains(markedMessage, "stayed queued") || !strings.Contains(markedMessage, "failure") {
t.Errorf("message missing expected fragments: %q", markedMessage)
}
}

// TestSyncOneJob_StuckQueuedSkippedWhenJobYoung verifies we do not burn
// GitHub API quota probing the run on freshly-queued jobs.
func TestSyncOneJob_StuckQueuedSkippedWhenJobYoung(t *testing.T) {
app, db, gh, _ := schedTestApp()
gh.OnAuthenticateApp = func(int64, int64) (string, error) { return "tok", nil }
gh.OnGetJobInfo = func(string, string, int64) (internal.GHJob, error) {
return internal.GHJob{Status: "queued", RunID: 1}, nil
}
runProbed := false
gh.OnGetRunInfo = func(string, string, int64) (internal.GHRun, error) {
runProbed = true
return internal.GHRun{}, nil
}
db.Jobs = []internal.Job{{
JobID: 1, Status: "pending", RepoFullName: "a/r", EntityName: "a",
EntityType: "Organization", InstallationID: 9,
CreatedAt: time.Now(), // brand new
}}
if err := app.syncJobsState(context.Background()); err != nil {
t.Fatal(err)
}
if runProbed {
t.Errorf("GetRunInfo should not be called for jobs younger than JobStuckQueuedMinAge")
}
if len(db.MarkFailed) != 0 {
t.Errorf("young queued job must not be marked failed")
}
}

// TestSyncOneJob_StuckQueuedRunStillRunningIsNoop covers the case where
// the run is still in flight — we probe but leave the job alone.
func TestSyncOneJob_StuckQueuedRunStillRunningIsNoop(t *testing.T) {
app, db, gh, _ := schedTestApp()
gh.OnAuthenticateApp = func(int64, int64) (string, error) { return "tok", nil }
gh.OnGetJobInfo = func(string, string, int64) (internal.GHJob, error) {
return internal.GHJob{Status: "queued", RunID: 1}, nil
}
gh.OnGetRunInfo = func(string, string, int64) (internal.GHRun, error) {
return internal.GHRun{Status: "in_progress"}, nil
}
db.Jobs = []internal.Job{{
JobID: 1, Status: "pending", RepoFullName: "a/r", EntityName: "a",
EntityType: "Organization", InstallationID: 9,
CreatedAt: time.Now().Add(-2 * internal.JobStuckQueuedMinAge),
}}
if err := app.syncJobsState(context.Background()); err != nil {
t.Fatal(err)
}
if len(db.MarkFailed) != 0 {
t.Errorf("queued job with still-running parent run must not be marked failed")
}
}

// TestSyncOneJob_InProgressFromRunningIsNoop covers the no-op branch when DB
// already has running.
func TestSyncOneJob_InProgressFromRunningIsNoop(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions container/cmd/scheduler/sync_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func TestPhase3_OfflineRunnerPastTimeoutFails(t *testing.T) {
if len(db.MarkFailed) != 1 {
t.Fatalf("expected MarkWorkerFailed, got %v", db.MarkFailed)
}
if db.MarkFailed[0].Info.Reason != internal.ReasonRunnerNeverRegistered {
t.Errorf("reason=%q want runner_never_registered", db.MarkFailed[0].Info.Reason)
if db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonRunnerNeverRegistered {
t.Errorf("reason=%q want runner_never_registered", db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason)
}
if len(kube.KillCalls) != 1 || kube.KillCalls[0] != w.PodName {
t.Errorf("expected KillPod for %s, got %v", w.PodName, kube.KillCalls)
Expand All @@ -75,7 +75,7 @@ func TestPhase3_OnlineIdleRunnerPastTimeoutFails(t *testing.T) {
if err := app.syncWorkersState(context.Background()); err != nil {
t.Fatalf("syncWorkersState: %v", err)
}
if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.Reason != internal.ReasonRunnerIdle {
if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonRunnerIdle {
t.Fatalf("expected RunnerIdle failure, got %v", db.MarkFailed)
}
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestPhase2_FailedPodMarksWorkerFailed(t *testing.T) {
if err := app.syncWorkersState(context.Background()); err != nil {
t.Fatal(err)
}
if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.Reason != internal.ReasonPodFailed {
if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonPodFailed {
t.Errorf("expected MarkWorkerFailed(pod_failed), got %v", db.MarkFailed)
}
}
Expand All @@ -152,7 +152,7 @@ func TestPhase3_PendingPastTimeoutFailsWithStuckPending(t *testing.T) {
if err := app.syncWorkersState(context.Background()); err != nil {
t.Fatal(err)
}
if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.Reason != internal.ReasonPodStuckPending {
if len(db.MarkFailed) != 1 || db.MarkFailed[0].Info.(internal.FailureInfoV2).Reason != internal.ReasonPodStuckPending {
t.Errorf("got %v", db.MarkFailed)
}
}
Expand Down
1 change: 1 addition & 0 deletions container/internal/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
RunnerPendingTimeout = 600 * time.Second // runner registered with GH but never picks up a job
PodPendingTimeout = 600 * time.Second // pod stuck Pending (no capacity, image pull, etc.)
PodDeleteGrace = 6 * time.Hour // keep terminal pods around so operators can still kubectl logs them
JobStuckQueuedMinAge = 10 * time.Minute // minimum job age before sync_jobs probes the parent run for a stuck-queued condition

PollInterval = 15 * time.Second

Expand Down
81 changes: 71 additions & 10 deletions container/internal/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,65 @@ type InstallationEvent struct {
Payload json.RawMessage `db:"payload" json:"-"`
}

// FailureInfo is the v2 shape written into workers.failure_info / jobs.failure_info.
type FailureInfo struct {
Version int `json:"version"`
Reason FailureReason `json:"reason"`
PodReason string `json:"pod_reason,omitempty"`
PodMessage string `json:"pod_message,omitempty"`
Containers map[string]ContainerInfo `json:"containers,omitempty"`
Events []EventInfo `json:"events,omitempty"`
CollectError string `json:"collect_error,omitempty"`
// FailureInfo is the sealed interface implemented by FailureInfoV1 and
// FailureInfoV2. Two on-disk schemas coexist for workers.failure_info /
// jobs.failure_info, and which one is correct depends on the failure
// mode:
//
// - FailureInfoV1 (non-pod failures): {"version":1,"message":"..."} --
// a free-form human message with no structured fields. Used by the
// scheduler when the failure isn't a k8s pod outcome (installation
// 404, job missing on GitHub, run completed with a queued job, ...).
// - FailureInfoV2 (pod failures): {"version":2, "reason":<enum>,
// "pod_reason":..., "pod_message":..., "containers":..., "events":...}
// populated from CollectPodFailureInfo. Reason is a typed enum value
// (ReasonPodFailed, ReasonPodStuckPending, ...).
//
// DB.Mark{Job,Worker}Failed accept the interface so a caller can pass
// either variant; the on-disk shape is determined by the concrete type.
// Renderers must look at "version" in the parsed JSON first and pick
// the right branch.
type FailureInfo interface {
isFailureInfo()
}

// FailureInfoV1 is the non-pod failure shape: a free-form message and
// nothing else. Marshals as {"version":1, "message":"..."}.
type FailureInfoV1 struct {
Message string
}

// FailureInfoV2 is the structured pod-failure shape produced by
// CollectPodFailureInfo. Marshals as {"version":2, "reason":..., ...}.
type FailureInfoV2 struct {
Reason FailureReason
PodReason string
PodMessage string
Containers map[string]ContainerInfo
Events []EventInfo
CollectError string
}

func (FailureInfoV1) isFailureInfo() {}
func (FailureInfoV2) isFailureInfo() {}

func (f FailureInfoV1) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
Version int `json:"version"`
Message string `json:"message,omitempty"`
}{1, f.Message})
}

func (f FailureInfoV2) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
Version int `json:"version"`
Reason FailureReason `json:"reason"`
PodReason string `json:"pod_reason,omitempty"`
PodMessage string `json:"pod_message,omitempty"`
Containers map[string]ContainerInfo `json:"containers,omitempty"`
Events []EventInfo `json:"events,omitempty"`
CollectError string `json:"collect_error,omitempty"`
}{2, f.Reason, f.PodReason, f.PodMessage, f.Containers, f.Events, f.CollectError})
}

// ContainerInfo holds termination details + optional logs for one container.
Expand Down Expand Up @@ -255,6 +305,16 @@ type GHJob struct {
Status string `json:"status"` // queued, in_progress, completed
Conclusion *string `json:"conclusion"` // null, success, failure, cancelled, ...
RunnerName string `json:"runner_name"`
RunID int64 `json:"run_id"`
}

// GHRun is the subset of the GitHub workflow-run response we use.
// A run can finish (status=completed, conclusion!=null) while one of
// its jobs stays stuck in status=queued forever — that's the case the
// scheduler reconciles against.
type GHRun struct {
Status string `json:"status"`
Conclusion *string `json:"conclusion"`
}

// Installation is the parsed shape of GET /app/installations/{id}.
Expand Down Expand Up @@ -337,6 +397,7 @@ type GitHubClient interface {
DeleteRunnerRepo(ctx context.Context, token, repoFullName string, runnerID int64) error

GetJobInfo(ctx context.Context, token, repoFullName string, jobID int64) (GHJob, error)
GetRunInfo(ctx context.Context, token, repoFullName string, runID int64) (GHRun, error)
}

// GitHubAPIError carries the HTTP status code so callers can distinguish 404.
Expand Down Expand Up @@ -364,7 +425,7 @@ type KubeClient interface {
DeletePod(ctx context.Context, podName string) error
KillPod(ctx context.Context, podName string) error
AvailableSlots(ctx context.Context, pool string) (Capacity, error)
CollectPodFailureInfo(ctx context.Context, pod Pod, reason FailureReason) FailureInfo
CollectPodFailureInfo(ctx context.Context, pod Pod, reason FailureReason) FailureInfoV2
}

// --- helpers shared between cmd/* ---
Expand Down
Loading
Loading