Skip to content
This repository was archived by the owner on May 19, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ GitHub (workflow_job webhook)
v
ghfe (container/cmd/ghfe)
| - Verifies webhook signature
| - Proxies workflow_job webhooks for EntityConfig.Staging repos to STAGING_URL (prod only)
| - Validates labels, determines entity type (org or personal)
| - Resolves (entity_id, job_labels) -> (k8s_pool, k8s_image)
| - Writes job to PostgreSQL
Expand Down
9 changes: 5 additions & 4 deletions container/cmd/ghfe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
os.Exit(2)
}

app := &App{Config: cfg, DB: db, GH: gh}
app := &App{Config: cfg, DB: db, GH: gh, StagingProxy: &http.Client{Timeout: 10 * time.Second}}
srv := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", internal.HTTPPort),
Handler: app.Routes(),
Expand All @@ -64,9 +64,10 @@ func main() {

// App holds the ghfe runtime dependencies handed to each request handler.
type App struct {
Config internal.Config
DB internal.DB
GH internal.GitHubClient
Config internal.Config
DB internal.DB
GH internal.GitHubClient
StagingProxy *http.Client // used by proxyToStaging; tests inject a stub
}

func (a *App) Routes() *http.ServeMux {
Expand Down
70 changes: 70 additions & 0 deletions container/cmd/ghfe/staging_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import (
"bytes"
"io"
"log/slog"
"net/http"
"strings"

"github.com/riseproject-dev/riscv-runner-app/container/internal"
)

// shouldProxyToStaging is true when this is the prod instance and the
// (entity, repo) pair is listed as a staging-test repo in EntityConfigs.
// Prod forwards those workflow_job webhooks to the staging ghfe so the
// staging environment can exercise a real repo end-to-end.
func shouldProxyToStaging(cfg internal.Config, entity internal.Entity, repoFullName string) bool {
if !cfg.Prod || cfg.StagingURL == "" {
return false
}
ec, ok := internal.EntityConfigs[entity.ID]
if !ok || len(ec.Staging) == 0 {
return false
}
repoName := repoFullName
if i := strings.IndexByte(repoFullName, '/'); i >= 0 {
repoName = repoFullName[i+1:]
}
for _, r := range ec.Staging {
if r == repoName {
return true
}
}
return false
}

// proxyToStaging forwards the unmodified webhook body to the staging
// ghfe and relays its response back to GitHub. Caller MUST have already
// verified the HMAC signature so we don't amplify untrusted traffic.
//
// Errors return 502 so GitHub redelivers.
func (a *App) proxyToStaging(w http.ResponseWriter, r *http.Request, body []byte) error {
req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, a.Config.StagingURL+"/", bytes.NewReader(body))
if err != nil {
return err
}
// Forward every request header verbatim. Staging needs the GitHub
// signature/event/delivery headers to verify and trace; carrying the
// rest is harmless and keeps the proxy behaviorally invisible.
req.Header = r.Header.Clone()

resp, err := a.StagingProxy.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

// Relay staging's response headers (mainly Content-Type and any
// X-GitHub-* it sets) back to GitHub.
for k, vs := range resp.Header {
for _, v := range vs {
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.StatusCode)
if _, err := io.Copy(w, resp.Body); err != nil {
slog.Warn("Failed to relay staging proxy response", "err", err)
}
return nil
}
189 changes: 189 additions & 0 deletions container/cmd/ghfe/staging_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package main

import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"errors"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/riseproject-dev/riscv-runner-app/container/internal"
"github.com/riseproject-dev/riscv-runner-app/container/internal/testutil"
)

// stubRT lets a test capture the request the staging client sent and
// return a canned response. The 502 path uses err != nil; the happy
// path returns resp with status copied from the test.
type stubRT struct {
gotReq *http.Request
gotBody []byte
resp *http.Response
err error
}

func (s *stubRT) RoundTrip(req *http.Request) (*http.Response, error) {
s.gotReq = req
if req.Body != nil {
b, _ := io.ReadAll(req.Body)
s.gotBody = b
}
return s.resp, s.err
}

func prodAppWithProxy(rt *stubRT) (*App, *testutil.FakeDB) {
db := testutil.NewFakeDB()
cfg := internal.Config{
Prod: true,
WebhookSecret: webhookSecret,
StagingURL: "https://staging.example/ghfe",
ImageUbuntu24: "img24",
}
return &App{Config: cfg, DB: db, GH: &testutil.FakeGH{}, StagingProxy: &http.Client{Transport: rt}}, db
}

func stagingPayload() []byte {
return mustJSON(map[string]any{
"action": "queued",
"installation": map[string]any{"id": float64(1)},
"repository": map[string]any{
"id": float64(2),
"full_name": "riseproject-dev/riscv-runner-sample",
"owner": map[string]any{"id": float64(internal.RiseprojectDevOrgID), "type": "Organization", "login": "riseproject-dev"},
},
"workflow_job": map[string]any{
"id": float64(7),
"labels": []any{"ubuntu-24.04-riscv"},
"html_url": "https://gh/run/7",
},
})
}

// TestStagingProxy_HappyPath: prod ghfe receives a workflow_job for a repo
// listed under EntityConfigs.Staging, forwards body + all request headers
// to StagingURL, records exactly one proxied_to_staging row, never touches
// the jobs table, and relays the staging response (status + body + headers)
// back to GitHub verbatim.
func TestStagingProxy_HappyPath(t *testing.T) {
rt := &stubRT{resp: &http.Response{
StatusCode: 202,
Header: http.Header{"Content-Type": []string{"text/plain"}, "X-Staging-Echo": []string{"ok"}},
Body: io.NopCloser(strings.NewReader("staging-ok")),
}}
app, db := prodAppWithProxy(rt)

body := stagingPayload()
mac := hmac.New(sha256.New, []byte(webhookSecret))
mac.Write(body)
sig := "sha256=" + hex.EncodeToString(mac.Sum(nil))

r := httptest.NewRequest("POST", "/", bytes.NewReader(body))
r.Header.Set(internal.HookSignatureHeader, sig)
r.Header.Set(internal.HookEventHeader, "workflow_job")
r.Header.Set(internal.HookAppIDHeader, "2167633")
r.Header.Set("X-GitHub-Delivery", "abc-123")
r.Header.Set("User-Agent", "GitHub-Hookshot/abc")
r.Header.Set("Content-Type", "application/json")

w := httptest.NewRecorder()
app.handleWebhook(w, r)

if w.Code != 202 {
t.Fatalf("status=%d want 202 (relayed)", w.Code)
}
if got := w.Body.String(); got != "staging-ok" {
t.Errorf("body=%q want %q", got, "staging-ok")
}
if w.Header().Get("X-Staging-Echo") != "ok" {
t.Errorf("missing relayed response header X-Staging-Echo, got=%v", w.Header())
}
if w.Header().Get("Content-Type") != "text/plain" {
t.Errorf("response Content-Type=%q want text/plain", w.Header().Get("Content-Type"))
}

if rt.gotReq == nil {
t.Fatal("staging client never called")
}
if rt.gotReq.URL.String() != "https://staging.example/ghfe/" {
t.Errorf("forwarded URL=%q", rt.gotReq.URL.String())
}
if rt.gotReq.Method != "POST" {
t.Errorf("method=%q", rt.gotReq.Method)
}
if !bytes.Equal(rt.gotBody, body) {
t.Errorf("forwarded body differs from original")
}
for _, h := range []string{internal.HookSignatureHeader, internal.HookEventHeader, internal.HookAppIDHeader, "X-Github-Delivery", "User-Agent", "Content-Type"} {
if rt.gotReq.Header.Get(h) == "" {
t.Errorf("forwarded request missing header %s", h)
}
}
if rt.gotReq.Header.Get(internal.HookSignatureHeader) != sig {
t.Errorf("forwarded signature mutated")
}

if len(db.Events) != 1 || db.Events[0].Row.Outcome != string(internal.OutcomeProxiedToStaging) {
t.Fatalf("expected one proxied_to_staging row, got %+v", db.Events)
}
if len(db.Jobs) != 0 {
t.Fatalf("prod must not store proxied jobs locally, got %d", len(db.Jobs))
}
}

// TestStagingProxy_UpstreamFailure: a transport error from staging surfaces
// as 502 to GitHub (so it redelivers) and the proxied_to_staging row is
// still written -- the proxy attempt itself is the audit-worthy event.
func TestStagingProxy_UpstreamFailure(t *testing.T) {
rt := &stubRT{err: errors.New("connection refused")}
app, db := prodAppWithProxy(rt)

body := stagingPayload()
mac := hmac.New(sha256.New, []byte(webhookSecret))
mac.Write(body)
r := httptest.NewRequest("POST", "/", bytes.NewReader(body))
r.Header.Set(internal.HookSignatureHeader, "sha256="+hex.EncodeToString(mac.Sum(nil)))
r.Header.Set(internal.HookEventHeader, "workflow_job")
r.Header.Set(internal.HookAppIDHeader, "2167633")

w := httptest.NewRecorder()
app.handleWebhook(w, r)

if w.Code != 502 {
t.Fatalf("status=%d want 502", w.Code)
}
if len(db.Events) != 1 || db.Events[0].Row.Outcome != string(internal.OutcomeProxiedToStaging) {
t.Fatalf("expected proxied_to_staging audit row even on failure, got %+v", db.Events)
}
}

// TestShouldProxyToStaging_Negatives pins the four ways the proxy does NOT fire.
func TestShouldProxyToStaging_Negatives(t *testing.T) {
prod := internal.Config{Prod: true, StagingURL: "https://s"}
riseEntity := internal.Entity{ID: internal.RiseprojectDevOrgID}
unknownEntity := internal.Entity{ID: 999999}

cases := []struct {
name string
cfg internal.Config
ent internal.Entity
repo string
}{
{"staging instance", internal.Config{Prod: false, StagingURL: "https://s"}, riseEntity, "riseproject-dev/riscv-runner-sample"},
{"no staging url", internal.Config{Prod: true}, riseEntity, "riseproject-dev/riscv-runner-sample"},
{"entity not in config", prod, unknownEntity, "riseproject-dev/riscv-runner-sample"},
{"repo not in entity staging list", prod, riseEntity, "riseproject-dev/something-else"},
}
for _, tc := range cases {
if shouldProxyToStaging(tc.cfg, tc.ent, tc.repo) {
t.Errorf("%s: expected no proxy", tc.name)
}
}

if !shouldProxyToStaging(prod, riseEntity, "riseproject-dev/riscv-runner-sample") {
t.Error("positive case: prod + sample repo should proxy")
}
}
52 changes: 32 additions & 20 deletions container/cmd/ghfe/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (a *App) handleWebhook(w http.ResponseWriter, r *http.Request) {
case "installation_target":
a.handleInstallationTargetEvent(w, r, event, payload, appID)
case "workflow_job":
a.handleWorkflowJobEvent(w, r, payload, appID)
a.handleWorkflowJobEvent(w, r, body, payload, appID)
default:
a.recordEvent(r, eventRecord{Event: event, Outcome: internal.OutcomeIgnoredEvent, Payload: payload, AppID: &appID})
_, _ = w.Write([]byte("Ignoring " + event + " event"))
Expand Down Expand Up @@ -155,7 +155,7 @@ func (a *App) handleInstallationTargetEvent(w http.ResponseWriter, r *http.Reque
_, _ = w.Write([]byte(event + "." + action + " logged"))
}

func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, payload map[string]any, appID int64) {
func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, body []byte, payload map[string]any, appID int64) {
action, _ := payload["action"].(string)
repo, _ := payload["repository"].(map[string]any)
install, _ := payload["installation"].(map[string]any)
Expand All @@ -173,15 +173,41 @@ func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, pay
ownerType, _ := owner["type"].(string)
ownerLogin, _ := owner["login"].(string)
installID := asInt64(install["id"])
repoFullName, _ := repo["full_name"].(string)
if ownerID == 0 {
httpError(w, 400, "Owner ID is missing in payload")
return
}
et, err := internal.ParseEntityType(ownerType)
if err != nil {
httpError(w, 400, err.Error())
return
}
entity := internal.Entity{Type: et, Name: ownerLogin, ID: ownerID}

trimmed := trimWorkflowJobPayload(payload)
base := eventRecord{
Payload: trimmed,
AppID: &appID,
InstallationID: &installID,
EntityType: &ownerType,
EntityID: &ownerID,
EntityName: &ownerLogin,
EntityType: (*string)(&entity.Type),
EntityID: &entity.ID,
EntityName: &entity.Name,
}

// Staging proxy: a real repo (e.g. riscv-runner-sample) is wired into
// the prod app but its webhooks should drive the staging environment.
// Forward the unmodified body to staging ghfe and short-circuit; the
// prod instance neither stores nor reconciles the job locally.
if shouldProxyToStaging(a.Config, entity, repoFullName) {
base.Event = "workflow_job." + action
base.Outcome = internal.OutcomeProxiedToStaging
a.recordEvent(r, base)
if err := a.proxyToStaging(w, r, body); err != nil {
slog.Error("Staging proxy failed", "entity", entity, "repo", repoFullName, "err", err)
httpError(w, 502, "staging proxy failed")
}
return
}

if action != "queued" && action != "in_progress" && action != "completed" {
Expand All @@ -193,23 +219,12 @@ func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, pay
return
}

if ownerID == 0 {
httpError(w, 400, "Owner ID is missing in payload")
return
}
et, err := internal.ParseEntityType(ownerType)
if err != nil {
httpError(w, 400, err.Error())
return
}

jobID := asInt64(job["id"])
if jobID == 0 {
httpError(w, 400, "Job ID is missing in payload")
return
}
labels := jsonStrings(job["labels"])
repoFullName, _ := repo["full_name"].(string)
if repoFullName == "" {
httpError(w, 400, "Repository full name is missing in payload")
return
Expand All @@ -219,11 +234,9 @@ func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, pay
return
}

// entity_id is the GitHub owner id for both orgs and users.
entityID := ownerID
base.Event = "workflow_job." + action

pool, image, matched := matchLabelsToK8s(a.Config, entityID, repoFullName, labels)
pool, image, matched := matchLabelsToK8s(a.Config, entity.ID, repoFullName, labels)
if !matched {
// ignored_no_label is the highest-volume row; trim aggressively.
htmlURL, _ := job["html_url"].(string)
Expand All @@ -241,7 +254,6 @@ func (a *App) handleWorkflowJobEvent(w http.ResponseWriter, r *http.Request, pay
return
}

entity := internal.Entity{Type: et, Name: ownerLogin, ID: entityID}
jobName, _ := job["name"].(string)
slog.Info("Received workflow_job",
"entity", entity,
Expand Down
Loading
Loading