diff --git a/README.md b/README.md index 1bc62d7..9cce183 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ GitHub (workflow_job webhook) v ghfe (container/cmd/ghfe) | - Verifies webhook signature + | - Proxies workflow_job webhooks for EntityConfig.Staging repos to STAGING_URL (prod only) | - Validates labels, determines entity type (org or personal) | - Resolves (entity_id, job_labels) -> (k8s_pool, k8s_image) | - Writes job to PostgreSQL diff --git a/container/cmd/ghfe/main.go b/container/cmd/ghfe/main.go index c543289..6d384b6 100644 --- a/container/cmd/ghfe/main.go +++ b/container/cmd/ghfe/main.go @@ -39,7 +39,7 @@ func main() { os.Exit(2) } - app := &App{Config: cfg, DB: db, GH: gh} + app := &App{Config: cfg, DB: db, GH: gh, StagingProxy: &http.Client{Timeout: 10 * time.Second}} srv := &http.Server{ Addr: fmt.Sprintf("0.0.0.0:%d", internal.HTTPPort), Handler: app.Routes(), @@ -64,9 +64,10 @@ func main() { // App holds the ghfe runtime dependencies handed to each request handler. type App struct { - Config internal.Config - DB internal.DB - GH internal.GitHubClient + Config internal.Config + DB internal.DB + GH internal.GitHubClient + StagingProxy *http.Client // used by proxyToStaging; tests inject a stub } func (a *App) Routes() *http.ServeMux { diff --git a/container/cmd/ghfe/staging_proxy.go b/container/cmd/ghfe/staging_proxy.go new file mode 100644 index 0000000..9bc3e07 --- /dev/null +++ b/container/cmd/ghfe/staging_proxy.go @@ -0,0 +1,70 @@ +package main + +import ( + "bytes" + "io" + "log/slog" + "net/http" + "strings" + + "github.com/riseproject-dev/riscv-runner-app/container/internal" +) + +// shouldProxyToStaging is true when this is the prod instance and the +// (entity, repo) pair is listed as a staging-test repo in EntityConfigs. +// Prod forwards those workflow_job webhooks to the staging ghfe so the +// staging environment can exercise a real repo end-to-end. +func shouldProxyToStaging(cfg internal.Config, entity internal.Entity, repoFullName string) bool { + if !cfg.Prod || cfg.StagingURL == "" { + return false + } + ec, ok := internal.EntityConfigs[entity.ID] + if !ok || len(ec.Staging) == 0 { + return false + } + repoName := repoFullName + if i := strings.IndexByte(repoFullName, '/'); i >= 0 { + repoName = repoFullName[i+1:] + } + for _, r := range ec.Staging { + if r == repoName { + return true + } + } + return false +} + +// proxyToStaging forwards the unmodified webhook body to the staging +// ghfe and relays its response back to GitHub. Caller MUST have already +// verified the HMAC signature so we don't amplify untrusted traffic. +// +// Errors return 502 so GitHub redelivers. +func (a *App) proxyToStaging(w http.ResponseWriter, r *http.Request, body []byte) error { + req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, a.Config.StagingURL+"/", bytes.NewReader(body)) + if err != nil { + return err + } + // Forward every request header verbatim. Staging needs the GitHub + // signature/event/delivery headers to verify and trace; carrying the + // rest is harmless and keeps the proxy behaviorally invisible. + req.Header = r.Header.Clone() + + resp, err := a.StagingProxy.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + // Relay staging's response headers (mainly Content-Type and any + // X-GitHub-* it sets) back to GitHub. + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + if _, err := io.Copy(w, resp.Body); err != nil { + slog.Warn("Failed to relay staging proxy response", "err", err) + } + return nil +} diff --git a/container/cmd/ghfe/staging_proxy_test.go b/container/cmd/ghfe/staging_proxy_test.go new file mode 100644 index 0000000..04d0203 --- /dev/null +++ b/container/cmd/ghfe/staging_proxy_test.go @@ -0,0 +1,189 @@ +package main + +import ( + "bytes" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/riseproject-dev/riscv-runner-app/container/internal" + "github.com/riseproject-dev/riscv-runner-app/container/internal/testutil" +) + +// stubRT lets a test capture the request the staging client sent and +// return a canned response. The 502 path uses err != nil; the happy +// path returns resp with status copied from the test. +type stubRT struct { + gotReq *http.Request + gotBody []byte + resp *http.Response + err error +} + +func (s *stubRT) RoundTrip(req *http.Request) (*http.Response, error) { + s.gotReq = req + if req.Body != nil { + b, _ := io.ReadAll(req.Body) + s.gotBody = b + } + return s.resp, s.err +} + +func prodAppWithProxy(rt *stubRT) (*App, *testutil.FakeDB) { + db := testutil.NewFakeDB() + cfg := internal.Config{ + Prod: true, + WebhookSecret: webhookSecret, + StagingURL: "https://staging.example/ghfe", + ImageUbuntu24: "img24", + } + return &App{Config: cfg, DB: db, GH: &testutil.FakeGH{}, StagingProxy: &http.Client{Transport: rt}}, db +} + +func stagingPayload() []byte { + return mustJSON(map[string]any{ + "action": "queued", + "installation": map[string]any{"id": float64(1)}, + "repository": map[string]any{ + "id": float64(2), + "full_name": "riseproject-dev/riscv-runner-sample", + "owner": map[string]any{"id": float64(internal.RiseprojectDevOrgID), "type": "Organization", "login": "riseproject-dev"}, + }, + "workflow_job": map[string]any{ + "id": float64(7), + "labels": []any{"ubuntu-24.04-riscv"}, + "html_url": "https://gh/run/7", + }, + }) +} + +// TestStagingProxy_HappyPath: prod ghfe receives a workflow_job for a repo +// listed under EntityConfigs.Staging, forwards body + all request headers +// to StagingURL, records exactly one proxied_to_staging row, never touches +// the jobs table, and relays the staging response (status + body + headers) +// back to GitHub verbatim. +func TestStagingProxy_HappyPath(t *testing.T) { + rt := &stubRT{resp: &http.Response{ + StatusCode: 202, + Header: http.Header{"Content-Type": []string{"text/plain"}, "X-Staging-Echo": []string{"ok"}}, + Body: io.NopCloser(strings.NewReader("staging-ok")), + }} + app, db := prodAppWithProxy(rt) + + body := stagingPayload() + mac := hmac.New(sha256.New, []byte(webhookSecret)) + mac.Write(body) + sig := "sha256=" + hex.EncodeToString(mac.Sum(nil)) + + r := httptest.NewRequest("POST", "/", bytes.NewReader(body)) + r.Header.Set(internal.HookSignatureHeader, sig) + r.Header.Set(internal.HookEventHeader, "workflow_job") + r.Header.Set(internal.HookAppIDHeader, "2167633") + r.Header.Set("X-GitHub-Delivery", "abc-123") + r.Header.Set("User-Agent", "GitHub-Hookshot/abc") + r.Header.Set("Content-Type", "application/json") + + w := httptest.NewRecorder() + app.handleWebhook(w, r) + + if w.Code != 202 { + t.Fatalf("status=%d want 202 (relayed)", w.Code) + } + if got := w.Body.String(); got != "staging-ok" { + t.Errorf("body=%q want %q", got, "staging-ok") + } + if w.Header().Get("X-Staging-Echo") != "ok" { + t.Errorf("missing relayed response header X-Staging-Echo, got=%v", w.Header()) + } + if w.Header().Get("Content-Type") != "text/plain" { + t.Errorf("response Content-Type=%q want text/plain", w.Header().Get("Content-Type")) + } + + if rt.gotReq == nil { + t.Fatal("staging client never called") + } + if rt.gotReq.URL.String() != "https://staging.example/ghfe/" { + t.Errorf("forwarded URL=%q", rt.gotReq.URL.String()) + } + if rt.gotReq.Method != "POST" { + t.Errorf("method=%q", rt.gotReq.Method) + } + if !bytes.Equal(rt.gotBody, body) { + t.Errorf("forwarded body differs from original") + } + for _, h := range []string{internal.HookSignatureHeader, internal.HookEventHeader, internal.HookAppIDHeader, "X-Github-Delivery", "User-Agent", "Content-Type"} { + if rt.gotReq.Header.Get(h) == "" { + t.Errorf("forwarded request missing header %s", h) + } + } + if rt.gotReq.Header.Get(internal.HookSignatureHeader) != sig { + t.Errorf("forwarded signature mutated") + } + + if len(db.Events) != 1 || db.Events[0].Row.Outcome != string(internal.OutcomeProxiedToStaging) { + t.Fatalf("expected one proxied_to_staging row, got %+v", db.Events) + } + if len(db.Jobs) != 0 { + t.Fatalf("prod must not store proxied jobs locally, got %d", len(db.Jobs)) + } +} + +// TestStagingProxy_UpstreamFailure: a transport error from staging surfaces +// as 502 to GitHub (so it redelivers) and the proxied_to_staging row is +// still written -- the proxy attempt itself is the audit-worthy event. +func TestStagingProxy_UpstreamFailure(t *testing.T) { + rt := &stubRT{err: errors.New("connection refused")} + app, db := prodAppWithProxy(rt) + + body := stagingPayload() + mac := hmac.New(sha256.New, []byte(webhookSecret)) + mac.Write(body) + r := httptest.NewRequest("POST", "/", bytes.NewReader(body)) + r.Header.Set(internal.HookSignatureHeader, "sha256="+hex.EncodeToString(mac.Sum(nil))) + r.Header.Set(internal.HookEventHeader, "workflow_job") + r.Header.Set(internal.HookAppIDHeader, "2167633") + + w := httptest.NewRecorder() + app.handleWebhook(w, r) + + if w.Code != 502 { + t.Fatalf("status=%d want 502", w.Code) + } + if len(db.Events) != 1 || db.Events[0].Row.Outcome != string(internal.OutcomeProxiedToStaging) { + t.Fatalf("expected proxied_to_staging audit row even on failure, got %+v", db.Events) + } +} + +// TestShouldProxyToStaging_Negatives pins the four ways the proxy does NOT fire. +func TestShouldProxyToStaging_Negatives(t *testing.T) { + prod := internal.Config{Prod: true, StagingURL: "https://s"} + riseEntity := internal.Entity{ID: internal.RiseprojectDevOrgID} + unknownEntity := internal.Entity{ID: 999999} + + cases := []struct { + name string + cfg internal.Config + ent internal.Entity + repo string + }{ + {"staging instance", internal.Config{Prod: false, StagingURL: "https://s"}, riseEntity, "riseproject-dev/riscv-runner-sample"}, + {"no staging url", internal.Config{Prod: true}, riseEntity, "riseproject-dev/riscv-runner-sample"}, + {"entity not in config", prod, unknownEntity, "riseproject-dev/riscv-runner-sample"}, + {"repo not in entity staging list", prod, riseEntity, "riseproject-dev/something-else"}, + } + for _, tc := range cases { + if shouldProxyToStaging(tc.cfg, tc.ent, tc.repo) { + t.Errorf("%s: expected no proxy", tc.name) + } + } + + if !shouldProxyToStaging(prod, riseEntity, "riseproject-dev/riscv-runner-sample") { + t.Error("positive case: prod + sample repo should proxy") + } +} diff --git a/container/cmd/ghfe/webhook.go b/container/cmd/ghfe/webhook.go index af63036..1f5b29b 100644 --- a/container/cmd/ghfe/webhook.go +++ b/container/cmd/ghfe/webhook.go @@ -62,7 +62,7 @@ func (a *App) handleWebhook(w http.ResponseWriter, r *http.Request) { case "installation_target": a.handleInstallationTargetEvent(w, r, event, payload, appID) case "workflow_job": - a.handleWorkflowJobEvent(w, r, payload, appID) + a.handleWorkflowJobEvent(w, r, body, payload, appID) default: a.recordEvent(r, eventRecord{Event: event, Outcome: internal.OutcomeIgnoredEvent, Payload: payload, AppID: &appID}) _, _ = w.Write([]byte("Ignoring " + event + " event")) @@ -155,7 +155,7 @@ func (a *App) handleInstallationTargetEvent(w http.ResponseWriter, r *http.Reque _, _ = w.Write([]byte(event + "." + action + " logged")) } -func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, payload map[string]any, appID int64) { +func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, body []byte, payload map[string]any, appID int64) { action, _ := payload["action"].(string) repo, _ := payload["repository"].(map[string]any) install, _ := payload["installation"].(map[string]any) @@ -173,15 +173,41 @@ func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, pay ownerType, _ := owner["type"].(string) ownerLogin, _ := owner["login"].(string) installID := asInt64(install["id"]) + repoFullName, _ := repo["full_name"].(string) + if ownerID == 0 { + httpError(w, 400, "Owner ID is missing in payload") + return + } + et, err := internal.ParseEntityType(ownerType) + if err != nil { + httpError(w, 400, err.Error()) + return + } + entity := internal.Entity{Type: et, Name: ownerLogin, ID: ownerID} trimmed := trimWorkflowJobPayload(payload) base := eventRecord{ Payload: trimmed, AppID: &appID, InstallationID: &installID, - EntityType: &ownerType, - EntityID: &ownerID, - EntityName: &ownerLogin, + EntityType: (*string)(&entity.Type), + EntityID: &entity.ID, + EntityName: &entity.Name, + } + + // Staging proxy: a real repo (e.g. riscv-runner-sample) is wired into + // the prod app but its webhooks should drive the staging environment. + // Forward the unmodified body to staging ghfe and short-circuit; the + // prod instance neither stores nor reconciles the job locally. + if shouldProxyToStaging(a.Config, entity, repoFullName) { + base.Event = "workflow_job." + action + base.Outcome = internal.OutcomeProxiedToStaging + a.recordEvent(r, base) + if err := a.proxyToStaging(w, r, body); err != nil { + slog.Error("Staging proxy failed", "entity", entity, "repo", repoFullName, "err", err) + httpError(w, 502, "staging proxy failed") + } + return } if action != "queued" && action != "in_progress" && action != "completed" { @@ -193,23 +219,12 @@ func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, pay return } - if ownerID == 0 { - httpError(w, 400, "Owner ID is missing in payload") - return - } - et, err := internal.ParseEntityType(ownerType) - if err != nil { - httpError(w, 400, err.Error()) - return - } - jobID := asInt64(job["id"]) if jobID == 0 { httpError(w, 400, "Job ID is missing in payload") return } labels := jsonStrings(job["labels"]) - repoFullName, _ := repo["full_name"].(string) if repoFullName == "" { httpError(w, 400, "Repository full name is missing in payload") return @@ -219,11 +234,9 @@ func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, pay return } - // entity_id is the GitHub owner id for both orgs and users. - entityID := ownerID base.Event = "workflow_job." + action - pool, image, matched := matchLabelsToK8s(a.Config, entityID, repoFullName, labels) + pool, image, matched := matchLabelsToK8s(a.Config, entity.ID, repoFullName, labels) if !matched { // ignored_no_label is the highest-volume row; trim aggressively. htmlURL, _ := job["html_url"].(string) @@ -241,7 +254,6 @@ func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, pay return } - entity := internal.Entity{Type: et, Name: ownerLogin, ID: entityID} jobName, _ := job["name"].(string) slog.Info("Received workflow_job", "entity", entity, diff --git a/container/internal/contract.go b/container/internal/contract.go index f728e9c..b94b535 100644 --- a/container/internal/contract.go +++ b/container/internal/contract.go @@ -59,6 +59,7 @@ const ( OutcomeIgnoredAction WebhookOutcome = "ignored_action" OutcomeIgnoredNoLabel WebhookOutcome = "ignored_no_label" OutcomeIgnoredEvent WebhookOutcome = "ignored_event" + OutcomeProxiedToStaging WebhookOutcome = "proxied_to_staging" OutcomeAuth404 WebhookOutcome = "auth_404" OutcomeAuthOtherError WebhookOutcome = "auth_other_error" ) diff --git a/container/serverless.yml b/container/serverless.yml index 6c31a1b..ed7db47 100644 --- a/container/serverless.yml +++ b/container/serverless.yml @@ -33,8 +33,11 @@ custom: ghfe: registryImage: ${env:REGISTRY}/${env:IMAGE}:ghfe-${self:custom.${self:provider.stage}.container-tag} port: 8080 - cpuLimit: 500 - memoryLimit: 512 + cpuLimit: 250 + memoryLimit: 256 + scalingOption: + type: cpu_usage + value: 70 secret: GHAPP_WEBHOOK_SECRET: ${env:GHAPP_WEBHOOK_SECRET} GHAPP_ORG_PRIVATE_KEY: ${env:GHAPP_ORG_PRIVATE_KEY} @@ -58,10 +61,13 @@ custom: scheduler: registryImage: ${env:REGISTRY}/${env:IMAGE}:scheduler-${self:custom.${self:provider.stage}.container-tag} port: 8080 - cpuLimit: 500 - memoryLimit: 512 + cpuLimit: 250 + memoryLimit: 256 minScale: 1 maxScale: 1 + scalingOption: + type: cpu_usage + value: 70 secret: GHAPP_WEBHOOK_SECRET: ${env:GHAPP_WEBHOOK_SECRET} GHAPP_ORG_PRIVATE_KEY: ${env:GHAPP_ORG_PRIVATE_KEY}