From f6bf59caf1e9423745ac39c9899151b1ede4c146 Mon Sep 17 00:00:00 2001 From: scttbnsn <80784472+scttbnsn@users.noreply.github.com> Date: Tue, 16 Jun 2026 15:12:51 -0400 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=90=9B=20fix(edge):=20order=20exec=20?= =?UTF-8?q?input=20and=20add=20outbound=20backpressure=20to=20the=20tunnel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ๐Ÿ› fix(edge): register exec sessions synchronously and buffer early input in arrival order via a single per-session writer goroutine, so keystrokes typed before the Docker exec is live are replayed in order instead of dropped - ๐Ÿ› fix(edge): front all outbound writes with a single sendPump goroutine over a bounded queue with a per-frame write deadline; evict (and reconnect) a controller that can't keep up instead of head-of-line-blocking every session, stalling the read pump, or hanging the agent indefinitely - ๐Ÿ”„ refactor(edge): add a consumer-side dockerAPI seam so exec and request fan-out are testable against a scripted fake daemon with no live socket - ๐Ÿงช test(edge): add a dedicated tunnel test harness (in-memory WS pair, scripted fakeConn/fakeDocker) covering hello signing, request dispatch/backpressure, exec-session lifecycle, ordered input replay, and send-path eviction --- internal/edge/backpressure_test.go | 143 +++++++++++ internal/edge/client.go | 129 +++++++++- internal/edge/dispatch_test.go | 123 ++++++++++ internal/edge/exec_docker_test.go | 158 ++++++++++++ internal/edge/harness_test.go | 379 +++++++++++++++++++++++++++++ internal/edge/hello_test.go | 113 +++++++++ internal/edge/ordered_io_test.go | 193 +++++++++++++++ internal/edge/request_test.go | 126 ++++++++++ internal/edge/tunnel.go | 177 +++++++++++--- internal/edge/tunnel_test.go | 236 ++++++++++++++++++ 10 files changed, 1731 insertions(+), 46 deletions(-) create mode 100644 internal/edge/backpressure_test.go create mode 100644 internal/edge/dispatch_test.go create mode 100644 internal/edge/exec_docker_test.go create mode 100644 internal/edge/harness_test.go create mode 100644 internal/edge/hello_test.go create mode 100644 internal/edge/ordered_io_test.go create mode 100644 internal/edge/request_test.go create mode 100644 internal/edge/tunnel_test.go diff --git a/internal/edge/backpressure_test.go b/internal/edge/backpressure_test.go new file mode 100644 index 0000000..9c9d201 --- /dev/null +++ b/internal/edge/backpressure_test.go @@ -0,0 +1,143 @@ +package edge + +import ( + "context" + "testing" + "time" + + "github.com/gorilla/websocket" + + "github.com/codeswhat/portwing/internal/protocol" +) + +// runSendPump creates the per-connection send queue and starts the sendPump +// against the test client, returning the channel so a test can observe/fill it. +// The pump is torn down via context cancellation registered as a test cleanup. +func runSendPump(t *testing.T, c *Client) chan protocol.Envelope { + t.Helper() + ch := make(chan protocol.Envelope, sendQueueSize) + c.connMu.Lock() + c.sendCh = ch + conn := c.conn + c.connMu.Unlock() + + ctx, cancel := context.WithCancel(context.Background()) + go c.sendPump(ctx, conn, ch) + t.Cleanup(cancel) + return ch +} + +// TestSendPumpDeliversQueuedFrame proves that the queued send path (sendCh set) +// delivers a frame end-to-end: sendTypedMessage enqueues the envelope, the +// sendPump dequeues and writes it over the WebSocket, and the controller reads +// back the exact content. +func TestSendPumpDeliversQueuedFrame(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + runSendPump(t, c) + + const ts = int64(42) + if err := c.sendTypedMessage(protocol.TypePong, protocol.PongMessage{Timestamp: ts}); err != nil { + t.Fatalf("sendTypedMessage: %v", err) + } + + var pong protocol.PongMessage + decodeData(t, expectType(t, ctrl, protocol.TypePong), &pong) + if pong.Timestamp != ts { + t.Errorf("pong.Timestamp = %d, want %d", pong.Timestamp, ts) + } +} + +// TestSendMessageEvictsConnectionWhenQueueFull pins the core backpressure +// invariant: when sendCh is full and no pump is draining it, the next +// sendMessage call takes the default branch and calls failConn, which closes +// the agent-side WebSocket. The controller observes the close as a read error, +// proving eviction rather than silent frame drop or deadlock. +func TestSendMessageEvictsConnectionWhenQueueFull(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + + // Install a capacity-1 queue (no pump running โ€” nobody drains it). + c.connMu.Lock() + c.sendCh = make(chan protocol.Envelope, 1) + c.connMu.Unlock() + + // Fill the queue to capacity so the next send hits the default branch. + c.sendCh <- protocol.Envelope{Type: protocol.TypePing} + + // This send must not block; it must call failConn and close the agent conn. + c.sendMessage(protocol.Envelope{Type: protocol.TypePing}) + + // The controller should see the connection torn down. Give it up to + // readTimeout to propagate. + if err := ctrl.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + t.Fatalf("set ctrl read deadline: %v", err) + } + _, _, err := ctrl.ReadMessage() + if err == nil { + t.Fatal("expected read error after eviction, got nil") + } +} + +// TestSendPumpEvictsOnWriteFailure verifies that a write error inside the +// sendPump causes failConn to be called and the agent-side connection to be +// closed. We induce the write failure by closing the controller end first; the +// sendPump's WriteJSON then fails, which must trigger failConn. The test +// confirms eviction by waiting until the agent conn itself becomes unusable. +func TestSendPumpEvictsOnWriteFailure(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + runSendPump(t, c) + + // Capture the agent conn before eviction so we can probe it afterwards. + c.connMu.Lock() + agentConn := c.conn + c.connMu.Unlock() + + // Closing the controller makes every subsequent WriteJSON from the agent + // fail because the peer has gone away. + if err := ctrl.Close(); err != nil { + t.Fatalf("close ctrl: %v", err) + } + + // Enqueue a frame; the sendPump will try to write it and fail. + if err := c.sendTypedMessage(protocol.TypePong, protocol.PongMessage{Timestamp: 1}); err != nil { + t.Fatalf("sendTypedMessage: %v", err) + } + + // Wait until failConn has propagated and the agent connection is unusable. + waitFor(t, "agent conn evicted", func() bool { + err := agentConn.WriteControl( + websocket.PingMessage, + nil, + time.Now().Add(10*time.Millisecond), + ) + return err != nil + }) +} + +// TestSendMessageDirectWriteWhenNoQueue documents that the handshake (nil +// sendCh) code path remains intact: with sendCh left nil (as newTestClient +// always leaves it), sendTypedMessage writes directly to the WebSocket and the +// controller receives the frame. Every existing dispatch test relies on this +// behaviour implicitly; this test makes it an explicit contract. +func TestSendMessageDirectWriteWhenNoQueue(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + // sendCh is nil โ€” newTestClient does not set it. + + const ts = int64(7) + if err := c.sendTypedMessage(protocol.TypePong, protocol.PongMessage{Timestamp: ts}); err != nil { + t.Fatalf("sendTypedMessage: %v", err) + } + + var pong protocol.PongMessage + decodeData(t, expectType(t, ctrl, protocol.TypePong), &pong) + if pong.Timestamp != ts { + t.Errorf("pong.Timestamp = %d, want %d", pong.Timestamp, ts) + } +} diff --git a/internal/edge/client.go b/internal/edge/client.go index 46c217c..4e31bba 100644 --- a/internal/edge/client.go +++ b/internal/edge/client.go @@ -13,6 +13,7 @@ import ( "io" "log/slog" "math/big" + "net" "net/http" "os" "strings" @@ -35,8 +36,30 @@ const ( maxResponseBody = 100 * 1024 * 1024 // 100 MB โ€” proxied response body cap maxExecSessions = 100 // concurrent exec sessions maxStreams = 100 // concurrent in-flight tunneled requests + + // sendQueueSize bounds outbound frames buffered for the sendPump. A full + // queue means the controller can't keep up, so the connection is evicted + // (slow-consumer backpressure) rather than letting the backlog grow. + sendQueueSize = 256 + // writeWait bounds a single WebSocket write. A controller that can't accept + // a frame within this window is treated as wedged and the connection is + // evicted, instead of blocking the writer forever. + writeWait = 10 * time.Second ) +// dockerAPI is the subset of *docker.Client the edge Client depends on. It is +// defined on the consumer side so the tunnel's exec sessions and the request +// fan-out can be exercised against a fake Docker daemon without a live socket. +// *docker.Client satisfies it structurally. +type dockerAPI interface { + GetVersion(ctx context.Context) (string, error) + Do(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) + DoStream(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) + CreateExec(ctx context.Context, containerID string, cmd []string, user string, tty bool) (string, error) + StartExec(ctx context.Context, execID string, tty bool) (net.Conn, error) + ResizeExec(ctx context.Context, execID string, cols, rows int) error +} + // edgeMessageSender wraps the edge Client to implement adapter.MessageSender. type edgeMessageSender struct { client *Client @@ -50,7 +73,7 @@ func (s *edgeMessageSender) SendTypedMessage(msgType string, data interface{}) e // and tunnels Docker API requests over the WebSocket. type Client struct { cfg *config.Config - dockerClient *docker.Client + dockerClient dockerAPI adapter adapter.EdgeAdapter compose *docker.ComposeManager collector *metrics.Collector @@ -59,6 +82,13 @@ type Client struct { conn *websocket.Conn connMu sync.Mutex + // sendCh fronts all post-handshake writes with a single sendPump goroutine, + // so a slow controller backs up here instead of head-of-line-blocking every + // sender or stalling the read pump. It is nil outside an active connection; + // the hello/welcome handshake writes the conn directly (no concurrent + // writer exists yet). Guarded by connMu alongside conn. + sendCh chan protocol.Envelope + execSessions sync.Map // streamSem bounds concurrent in-flight request handlers (maxStreams). @@ -230,6 +260,25 @@ func (c *Client) connect(ctx context.Context) (bool, error) { slog.Info("connected to controller") + pumpCtx, pumpCancel := context.WithCancel(ctx) + defer pumpCancel() + + var wg sync.WaitGroup + + // Bring the outbound send path up before any post-handshake send, so the + // adapter sync, metrics, and every pump funnel through the single sendPump + // (the only writer) instead of writing the conn concurrently. + sendCh := make(chan protocol.Envelope, sendQueueSize) + c.connMu.Lock() + c.sendCh = sendCh + c.connMu.Unlock() + + wg.Add(1) + go func() { + defer wg.Done() + c.sendPump(pumpCtx, conn, sendCh) + }() + // Let adapter handle initial sync (container sync, component sync, etc.). sender := &edgeMessageSender{client: c} if err := c.adapter.OnConnect(ctx, sender); err != nil { @@ -240,10 +289,6 @@ func (c *Client) connect(ctx context.Context) (bool, error) { c.sendMetrics() // Run pumps. - pumpCtx, pumpCancel := context.WithCancel(ctx) - defer pumpCancel() - - var wg sync.WaitGroup wg.Add(2) var readErr error @@ -266,6 +311,7 @@ func (c *Client) connect(ctx context.Context) (bool, error) { closeWebSocket(c.conn, "connection loop end") c.conn = nil } + c.sendCh = nil c.connMu.Unlock() // Reaching here means the welcome handshake succeeded, so the connection @@ -421,7 +467,11 @@ func (c *Client) readPump(ctx context.Context) error { continue } c.auditor.ExecStart(c.cfg.DrydockURL, msg.ContainerID, msg.ExecID) - go c.StartExec(ctx, msg) + // Synchronous: StartExec only registers the session and spawns the + // Docker bring-up, so it returns immediately. Registering before the + // next message is dispatched is what keeps a following exec_input + // from racing the bring-up and being dropped (ordered exec I/O). + c.StartExec(ctx, msg) case protocol.TypeExecInput: var msg protocol.ExecInputMessage @@ -621,17 +671,74 @@ func (c *Client) sendTypedMessage(msgType string, data interface{}) error { return nil } -// sendMessage performs a thread-safe WebSocket write. +// sendMessage hands an envelope to the sendPump. The enqueue is non-blocking: +// sendMessage runs on the read-pump goroutine for pongs and rejections, so it +// must never block. A full queue means the controller can't keep up โ€” the +// connection is evicted (and Run reconnects) rather than dropping frames, which +// would hang a request or corrupt a stream. +// +// Before the send path is up (the hello/welcome handshake), sendCh is nil and +// the frame is written directly โ€” no concurrent writer exists yet. func (c *Client) sendMessage(env protocol.Envelope) { c.connMu.Lock() - defer c.connMu.Unlock() + ch := c.sendCh + conn := c.conn + c.connMu.Unlock() - if c.conn == nil { + if ch == nil { + // Handshake phase: synchronous direct write, provably single-writer. + if conn == nil { + return + } + _ = conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := conn.WriteJSON(env); err != nil { + slog.Warn("websocket write failed", "type", env.Type, "error", err) + } return } - if err := c.conn.WriteJSON(env); err != nil { - slog.Warn("websocket write failed", "type", env.Type, "error", err) + select { + case ch <- env: + default: + c.failConn("send queue full") + } +} + +// sendPump is the sole writer to the WebSocket once a connection is up. +// Fronting every send with one goroutine and a bounded queue is the tunnel's +// outbound backpressure: a slow controller backs up sendCh instead of +// head-of-line-blocking every sender or stalling the read pump, and a write +// that can't complete within writeWait evicts the connection rather than +// blocking forever. +func (c *Client) sendPump(ctx context.Context, conn *websocket.Conn, sendCh chan protocol.Envelope) { + for { + select { + case <-ctx.Done(): + return + case env := <-sendCh: + if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { + c.failConn("set write deadline failed") + return + } + if err := conn.WriteJSON(env); err != nil { + slog.Warn("websocket write failed", "type", env.Type, "error", err) + c.failConn("write failed") + return + } + } + } +} + +// failConn evicts the active WebSocket. Closing it unblocks the read pump with +// an error, which tears the pumps down and lets Run reconnect with backoff. +// Safe to call from any goroutine and more than once. +func (c *Client) failConn(reason string) { + c.connMu.Lock() + conn := c.conn + c.connMu.Unlock() + if conn != nil { + slog.Warn("evicting controller connection", "reason", reason) + closeWebSocket(conn, reason) } } diff --git a/internal/edge/dispatch_test.go b/internal/edge/dispatch_test.go new file mode 100644 index 0000000..466d153 --- /dev/null +++ b/internal/edge/dispatch_test.go @@ -0,0 +1,123 @@ +package edge + +import ( + "context" + "encoding/json" + "testing" + + "github.com/gorilla/websocket" + + "github.com/codeswhat/portwing/internal/protocol" +) + +// runReadPump starts the read pump against the test client and returns a cancel +// func. The pump exits when the context is cancelled or the conn closes (test +// cleanup closes both ends). +func runReadPump(t *testing.T, c *Client) context.CancelFunc { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + go func() { _ = c.readPump(ctx) }() + t.Cleanup(cancel) + return cancel +} + +// sendEnvelope marshals data into an envelope and sends it from the controller +// to the agent. +func sendEnvelope(t *testing.T, ctrl *websocket.Conn, msgType string, data any) { + t.Helper() + env := protocol.Envelope{Type: msgType} + if data != nil { + raw, err := json.Marshal(data) + if err != nil { + t.Fatalf("marshal %s: %v", msgType, err) + } + env.Data = raw + } + if err := ctrl.WriteJSON(env); err != nil { + t.Fatalf("write %s: %v", msgType, err) + } +} + +// A ping from the controller is answered with a pong that echoes the timestamp. +func TestReadPumpAnswersPingWithPong(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + runReadPump(t, c) + + sendEnvelope(t, ctrl, protocol.TypePing, protocol.PingMessage{Timestamp: 12345}) + + var pong protocol.PongMessage + decodeData(t, expectType(t, ctrl, protocol.TypePong), &pong) + if pong.Timestamp != 12345 { + t.Errorf("pong timestamp = %d, want 12345", pong.Timestamp) + } +} + +// A malformed envelope is skipped and the read loop keeps serving โ€” proven by a +// subsequent ping still drawing a pong. +func TestReadPumpSkipsMalformedEnvelopeAndKeepsServing(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + runReadPump(t, c) + + if err := ctrl.WriteMessage(websocket.TextMessage, []byte("{ not valid json")); err != nil { + t.Fatalf("write garbage: %v", err) + } + + sendEnvelope(t, ctrl, protocol.TypePing, protocol.PingMessage{Timestamp: 7}) + expectType(t, ctrl, protocol.TypePong) +} + +// Once the in-flight request semaphore is saturated, further requests are +// rejected with an error rather than blocking the read loop โ€” the backpressure +// guarantee for tunneled request fan-out. +func TestReadPumpRejectsRequestsWhenStreamLimitReached(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + + // Saturate the stream semaphore so the dispatch hits the default (reject) + // branch without ever reaching the Docker-backed request handler. + for i := 0; i < maxStreams; i++ { + c.streamSem <- struct{}{} + } + + runReadPump(t, c) + + sendEnvelope(t, ctrl, protocol.TypeRequest, protocol.RequestMessage{ + RequestID: "req-1", + Method: "GET", + Path: "/containers/json", + }) + + var errMsg protocol.ErrorMessage + decodeData(t, expectType(t, ctrl, protocol.TypeError), &errMsg) + if errMsg.RequestID != "req-1" { + t.Errorf("error RequestID = %q, want req-1", errMsg.RequestID) + } + if errMsg.Message == "" { + t.Error("rejection carried no message") + } +} + +// Exec control messages for an unknown session are dispatched without crashing +// the read loop; liveness is confirmed by a following ping/pong. +func TestReadPumpDispatchesExecControlForUnknownSession(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + runReadPump(t, c) + + sendEnvelope(t, ctrl, protocol.TypeExecInput, protocol.ExecInputMessage{ExecID: "ghost", Data: "Zm9v"}) + sendEnvelope(t, ctrl, protocol.TypeExecResize, protocol.ExecResizeMessage{ExecID: "ghost", Cols: 80, Rows: 24}) + sendEnvelope(t, ctrl, protocol.TypeExecEnd, protocol.ExecEndMessage{ExecID: "ghost"}) + + sendEnvelope(t, ctrl, protocol.TypePing, protocol.PingMessage{Timestamp: 99}) + var pong protocol.PongMessage + decodeData(t, expectType(t, ctrl, protocol.TypePong), &pong) + if pong.Timestamp != 99 { + t.Errorf("pong timestamp = %d, want 99", pong.Timestamp) + } +} diff --git a/internal/edge/exec_docker_test.go b/internal/edge/exec_docker_test.go new file mode 100644 index 0000000..89ce639 --- /dev/null +++ b/internal/edge/exec_docker_test.go @@ -0,0 +1,158 @@ +package edge + +import ( + "context" + "encoding/base64" + "errors" + "io" + "reflect" + "testing" + + "github.com/codeswhat/portwing/internal/protocol" +) + +// On a successful start, StartExec creates and starts the Docker exec, applies +// the requested terminal size, announces exec_ready, then streams output and a +// terminal exec_end as the session drains. +func TestStartExecSuccessStreamsOutput(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + execConn := &fakeConn{reads: [][]byte{[]byte("hi\n")}, readErr: io.EOF} + fd := &fakeDocker{createExecID: "docker-1", startConn: execConn} + c.dockerClient = fd + + c.StartExec(context.Background(), protocol.ExecStartMessage{ + ExecID: "e1", + ContainerID: "c1", + Cmd: []string{"sh", "-c", "echo hi"}, + User: "root", + Cols: 120, + Rows: 40, + }) + + var ready protocol.ExecReadyMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecReady), &ready) + if ready.ExecID != "e1" { + t.Errorf("exec_ready ExecID = %q, want e1", ready.ExecID) + } + + var out protocol.ExecOutputMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecOutput), &out) + if decoded, _ := base64.StdEncoding.DecodeString(out.Data); string(decoded) != "hi\n" { + t.Errorf("streamed output = %q, want %q", decoded, "hi\n") + } + + var end protocol.ExecEndMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecEnd), &end) + if end.Reason != "exited" { + t.Errorf("exec_end reason = %q, want exited", end.Reason) + } + + // The exec was created with the requested command/user, and the initial + // resize targeted the Docker-issued exec id (not the protocol exec id). + create := fd.createCallList() + if len(create) != 1 || create[0].containerID != "c1" || create[0].user != "root" || + !reflect.DeepEqual(create[0].cmd, []string{"sh", "-c", "echo hi"}) { + t.Errorf("CreateExec calls = %+v, want one call for c1/root/[sh -c echo hi]", create) + } + resizes := fd.resizeCallList() + if len(resizes) != 1 || resizes[0] != (resizeCall{"docker-1", 120, 40}) { + t.Errorf("resize calls = %+v, want one {docker-1 120 40}", resizes) + } +} + +// A zero-size terminal request skips the initial resize entirely. +func TestStartExecSkipsResizeWhenNoSize(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + execConn := &fakeConn{readErr: io.EOF} + fd := &fakeDocker{createExecID: "docker-1", startConn: execConn} + c.dockerClient = fd + + c.StartExec(context.Background(), protocol.ExecStartMessage{ + ExecID: "e1", ContainerID: "c1", Cmd: []string{"sh"}, + }) + + // Drain the lifecycle so the session goroutine completes. + expectType(t, ctrl, protocol.TypeExecReady) + expectType(t, ctrl, protocol.TypeExecEnd) + + if got := fd.resizeCallList(); len(got) != 0 { + t.Errorf("resize calls = %+v, want none for a zero-size request", got) + } +} + +// A CreateExec failure is reported as a terminal exec_end and no session is +// registered or started. +func TestStartExecCreateFailure(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + fd := &fakeDocker{createExecErr: errors.New("boom")} + c.dockerClient = fd + + c.StartExec(context.Background(), protocol.ExecStartMessage{ExecID: "e1", ContainerID: "c1"}) + + var end protocol.ExecEndMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecEnd), &end) + if end.ExecID != "e1" || end.Reason != "create exec failed: boom" { + t.Errorf("exec_end = %+v, want e1 / create exec failed: boom", end) + } + if _, ok := c.execSessions.Load("e1"); ok { + t.Error("a session was registered despite CreateExec failing") + } +} + +// A StartExec failure (after a successful create) is likewise terminal. +func TestStartExecStartFailure(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + fd := &fakeDocker{createExecID: "docker-1", startExecErr: errors.New("nope")} + c.dockerClient = fd + + c.StartExec(context.Background(), protocol.ExecStartMessage{ExecID: "e1", ContainerID: "c1"}) + + var end protocol.ExecEndMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecEnd), &end) + if end.Reason != "start exec failed: nope" { + t.Errorf("exec_end reason = %q, want start exec failed: nope", end.Reason) + } + if _, ok := c.execSessions.Load("e1"); ok { + t.Error("a session was registered despite StartExec failing") + } +} + +// HandleResize forwards a single successful resize to the Docker client. +func TestHandleResizeForwardsToDocker(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + fd := &fakeDocker{} + c.dockerClient = fd + newExecSession(c, "e1", &fakeConn{}) + + c.HandleResize(context.Background(), protocol.ExecResizeMessage{ExecID: "e1", Cols: 100, Rows: 30}) + + if got := fd.resizeCallList(); len(got) != 1 || got[0] != (resizeCall{"e1", 100, 30}) { + t.Errorf("resize calls = %+v, want one {e1 100 30}", got) + } +} + +// A transient resize error is retried until it succeeds. +func TestHandleResizeRetriesUntilSuccess(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + fd := &fakeDocker{resizeFailFirst: 1} + c.dockerClient = fd + newExecSession(c, "e1", &fakeConn{}) + + c.HandleResize(context.Background(), protocol.ExecResizeMessage{ExecID: "e1", Cols: 80, Rows: 24}) + + if fd.resizeAttempts != 2 { + t.Errorf("resize attempts = %d, want 2 (one failure then success)", fd.resizeAttempts) + } +} diff --git a/internal/edge/harness_test.go b/internal/edge/harness_test.go new file mode 100644 index 0000000..9bd184b --- /dev/null +++ b/internal/edge/harness_test.go @@ -0,0 +1,379 @@ +package edge + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "net" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" + + "github.com/codeswhat/portwing/internal/audit" + "github.com/codeswhat/portwing/internal/config" + "github.com/codeswhat/portwing/internal/protocol" +) + +// readTimeout bounds every controller-side read so a missing reply fails the +// test fast instead of hanging the suite. +const readTimeout = 2 * time.Second + +// newWSPair stands up an in-memory WebSocket and returns both ends. agent is +// the connection the edge Client writes to (assigned to Client.conn); ctrl is +// the controller side the test drives โ€” it sends frames to the agent and reads +// back whatever the agent emits. Both ends are closed at test cleanup. +func newWSPair(t *testing.T) (agent, ctrl *websocket.Conn) { + t.Helper() + + upgrader := websocket.Upgrader{} + srvCh := make(chan *websocket.Conn, 1) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + srvCh <- c + })) + t.Cleanup(srv.Close) + + wsURL := "ws" + strings.TrimPrefix(srv.URL, "http") + agent, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + if resp != nil { + _ = resp.Body.Close() + } + if err != nil { + t.Fatalf("dial controller: %v", err) + } + + select { + case ctrl = <-srvCh: + case <-time.After(readTimeout): + t.Fatal("controller never upgraded the connection") + } + + t.Cleanup(func() { + _ = agent.Close() + _ = ctrl.Close() + }) + return agent, ctrl +} + +// newTestClient builds a Client wired to an in-memory controller. The returned +// ctrl conn is the controller side. The dockerClient and adapter are left nil +// on purpose โ€” every path this harness exercises stops before touching them; +// the Docker-backed exec paths (CreateExec/StartExec/ResizeExec success) belong +// to an integration tier against a real daemon, not this unit harness. +func newTestClient(t *testing.T) (*Client, *websocket.Conn) { + t.Helper() + + agent, ctrl := newWSPair(t) + + // "" disables the audit logger (zero overhead beyond a nil check), so the + // readPump exec_start audit call is a safe no-op. + auditor, closeAudit, err := audit.New("") + if err != nil { + t.Fatalf("audit.New: %v", err) + } + t.Cleanup(closeAudit) + + c := &Client{ + cfg: &config.Config{}, + auditor: auditor, + streamSem: make(chan struct{}, maxStreams), + conn: agent, + } + return c, ctrl +} + +// expectEnvelope reads one envelope from the controller, failing if none +// arrives within readTimeout. +func expectEnvelope(t *testing.T, ctrl *websocket.Conn) protocol.Envelope { + t.Helper() + + if err := ctrl.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + t.Fatalf("set read deadline: %v", err) + } + _, raw, err := ctrl.ReadMessage() + if err != nil { + t.Fatalf("read from agent: %v", err) + } + var env protocol.Envelope + if err := json.Unmarshal(raw, &env); err != nil { + t.Fatalf("unmarshal envelope %q: %v", raw, err) + } + return env +} + +// expectType reads one envelope and asserts its type, returning the inner data +// for further decoding. +func expectType(t *testing.T, ctrl *websocket.Conn, want string) json.RawMessage { + t.Helper() + + env := expectEnvelope(t, ctrl) + if env.Type != want { + t.Fatalf("envelope type = %q, want %q (data=%s)", env.Type, want, env.Data) + } + return env.Data +} + +// decodeData unmarshals an envelope's data payload into v. +func decodeData(t *testing.T, data json.RawMessage, v any) { + t.Helper() + if err := json.Unmarshal(data, v); err != nil { + t.Fatalf("decode %T: %v", v, err) + } +} + +// newExecSession constructs a minimal registered ExecSession over conn, with no +// input writer running. Use for tests that drive readLoop/Close/EndExec +// directly and don't exercise the input queue. +func newExecSession(c *Client, execID string, conn net.Conn) *ExecSession { + s := &ExecSession{ + execID: execID, + conn: conn, + client: c, + connReady: make(chan struct{}), + inbox: make(chan []byte, execInputQueue), + done: make(chan struct{}), + } + c.execSessions.Store(execID, s) + return s +} + +// newReadySession registers a fully live exec session: conn wired, connReady +// closed, and the inputWriter goroutine running. Use for tests that send input +// through HandleInput and expect it written to conn (the writer drains +// asynchronously, so assert with waitFor). +func newReadySession(c *Client, execID string, conn net.Conn) *ExecSession { + s := newExecSession(c, execID, conn) + close(s.connReady) + go s.inputWriter() + return s +} + +// waitFor polls cond until it returns true or readTimeout elapses, failing the +// test on timeout. Use to await asynchronous effects (e.g. the input writer +// draining the queue to conn). +func waitFor(t *testing.T, what string, cond func() bool) { + t.Helper() + deadline := time.Now().Add(readTimeout) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(2 * time.Millisecond) + } + t.Fatalf("timed out waiting for %s", what) +} + +// fakeConn is a scripted net.Conn. Read drains the queued chunks then returns +// readErr (io.EOF by default). Write records bytes unless writeErr is set, in +// which case it always fails โ€” used to drive the write-retry path. +// +// Set blockRead to a channel that Read will block on (after draining any +// scripted chunks). When the channel is closed, Read returns io.EOF. This lets +// tests that care only about writes keep the readLoop alive so it doesn't race +// with the inputWriter via the done channel. +type fakeConn struct { + mu sync.Mutex + reads [][]byte + readErr error + writeErr error + writes bytes.Buffer + closed bool + blockRead chan struct{} // when non-nil, Read blocks until this is closed +} + +func (c *fakeConn) Read(p []byte) (int, error) { + c.mu.Lock() + if len(c.reads) > 0 { + chunk := c.reads[0] + c.reads = c.reads[1:] + n := copy(p, chunk) + // If the chunk didn't fit, requeue the remainder. + if n < len(chunk) { + c.reads = append([][]byte{chunk[n:]}, c.reads...) + } + c.mu.Unlock() + return n, nil + } + block := c.blockRead + readErr := c.readErr + c.mu.Unlock() + + if block != nil { + <-block + return 0, io.EOF + } + if readErr != nil { + return 0, readErr + } + return 0, io.EOF +} + +func (c *fakeConn) Write(p []byte) (int, error) { + c.mu.Lock() + defer c.mu.Unlock() + if c.writeErr != nil { + return 0, c.writeErr + } + return c.writes.Write(p) +} + +func (c *fakeConn) written() []byte { + c.mu.Lock() + defer c.mu.Unlock() + return append([]byte(nil), c.writes.Bytes()...) +} + +func (c *fakeConn) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + c.closed = true + return nil +} + +// isClosed reports whether Close has been called. Mutex-safe. +func (c *fakeConn) isClosed() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.closed +} + +func (c *fakeConn) LocalAddr() net.Addr { return fakeAddr{} } +func (c *fakeConn) RemoteAddr() net.Addr { return fakeAddr{} } +func (c *fakeConn) SetDeadline(_ time.Time) error { return nil } +func (c *fakeConn) SetReadDeadline(_ time.Time) error { return nil } +func (c *fakeConn) SetWriteDeadline(_ time.Time) error { return nil } + +type fakeAddr struct{} + +func (fakeAddr) Network() string { return "fake" } +func (fakeAddr) String() string { return "fake" } + +// fakeDocker is a scripted dockerAPI: it records calls and returns canned +// values, standing in for *docker.Client so the exec sessions and request +// fan-out can be driven without a live Docker daemon. +type fakeDocker struct { + mu sync.Mutex + + version string + + createExecID string + createExecErr error + createCalls []createExecCall + + startConn net.Conn + startExecErr error + startCalls []string // exec ids passed to StartExec + // startGate, when non-nil, blocks StartExec until the channel is closed or + // receives โ€” lets a test hold the exec bring-up open while it sends input + // that must be buffered in order rather than dropped. + startGate chan struct{} + + // resizeFailFirst makes the first N ResizeExec calls fail before succeeding, + // to exercise the retry path. + resizeFailFirst int + resizeErr error + resizeAttempts int + resizeCalls []resizeCall + + doResp *http.Response + doErr error + streamResp *http.Response + streamErr error + doCalls []doCall +} + +type createExecCall struct { + containerID string + cmd []string + user string +} + +type resizeCall struct { + execID string + cols int + rows int +} + +type doCall struct { + method string + path string + stream bool +} + +func (f *fakeDocker) GetVersion(context.Context) (string, error) { + if f.version == "" { + return "test-docker", nil + } + return f.version, nil +} + +func (f *fakeDocker) CreateExec(_ context.Context, containerID string, cmd []string, user string, _ bool) (string, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.createCalls = append(f.createCalls, createExecCall{containerID, cmd, user}) + return f.createExecID, f.createExecErr +} + +func (f *fakeDocker) StartExec(_ context.Context, execID string, _ bool) (net.Conn, error) { + f.mu.Lock() + f.startCalls = append(f.startCalls, execID) + gate := f.startGate + startErr := f.startExecErr + conn := f.startConn + f.mu.Unlock() + + if gate != nil { + <-gate + } + if startErr != nil { + return nil, startErr + } + return conn, nil +} + +func (f *fakeDocker) ResizeExec(_ context.Context, execID string, cols, rows int) error { + f.mu.Lock() + defer f.mu.Unlock() + f.resizeAttempts++ + f.resizeCalls = append(f.resizeCalls, resizeCall{execID, cols, rows}) + if f.resizeAttempts <= f.resizeFailFirst { + return errors.New("resize busy") + } + return f.resizeErr +} + +func (f *fakeDocker) Do(_ context.Context, method, path string, _ io.Reader) (*http.Response, error) { + f.mu.Lock() + f.doCalls = append(f.doCalls, doCall{method, path, false}) + f.mu.Unlock() + return f.doResp, f.doErr +} + +func (f *fakeDocker) DoStream(_ context.Context, method, path string, _ io.Reader) (*http.Response, error) { + f.mu.Lock() + f.doCalls = append(f.doCalls, doCall{method, path, true}) + f.mu.Unlock() + return f.streamResp, f.streamErr +} + +func (f *fakeDocker) resizeCallList() []resizeCall { + f.mu.Lock() + defer f.mu.Unlock() + return append([]resizeCall(nil), f.resizeCalls...) +} + +func (f *fakeDocker) createCallList() []createExecCall { + f.mu.Lock() + defer f.mu.Unlock() + return append([]createExecCall(nil), f.createCalls...) +} diff --git a/internal/edge/hello_test.go b/internal/edge/hello_test.go new file mode 100644 index 0000000..f559b7a --- /dev/null +++ b/internal/edge/hello_test.go @@ -0,0 +1,113 @@ +package edge + +import ( + "context" + "crypto/ed25519" + "crypto/sha256" + "encoding/base64" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/codeswhat/portwing/internal/auth" + "github.com/codeswhat/portwing/internal/config" + "github.com/codeswhat/portwing/internal/protocol" +) + +func newHello() protocol.HelloMessage { + return protocol.HelloMessage{ + Version: protocol.AgentVersion, + Protocol: protocol.ProtocolString, + AgentID: "agent-1", + AgentName: "test-agent", + } +} + +// setTokenHash derives the SHA-256 hex of the configured token; an empty token +// leaves the field blank. +func TestSetTokenHash(t *testing.T) { + t.Parallel() + + c := &Client{cfg: &config.Config{Token: "s3cr3t"}} + var hello = newHello() + c.setTokenHash(&hello) + + want := fmt.Sprintf("%x", sha256.Sum256([]byte("s3cr3t"))) + if hello.TokenHash != want { + t.Errorf("TokenHash = %q, want %q", hello.TokenHash, want) + } + + c.cfg.Token = "" + hello = newHello() + c.setTokenHash(&hello) + if hello.TokenHash != "" { + t.Errorf("TokenHash = %q for empty token, want empty", hello.TokenHash) + } +} + +// signHello must populate the Ed25519 fields with a signature that verifies +// against the canonical WebSocket-upgrade message, and must clear TokenHash so +// the two auth modes never coexist. +func TestSignHelloProducesVerifiableSignature(t *testing.T) { + t.Parallel() + + privPEM, _, err := auth.GenerateKeyPair("test") + if err != nil { + t.Fatalf("GenerateKeyPair: %v", err) + } + priv, err := auth.ParsePrivateKeyPEM(privPEM) + if err != nil { + t.Fatalf("ParsePrivateKeyPEM: %v", err) + } + pub := priv.Public().(ed25519.PublicKey) + + keyPath := filepath.Join(t.TempDir(), "agent.key") + if err := os.WriteFile(keyPath, privPEM, 0o600); err != nil { + t.Fatalf("write key: %v", err) + } + + c := &Client{cfg: &config.Config{PrivateKeyFile: keyPath, Token: "ignored-when-signing"}} + hello := newHello() + hello.TokenHash = "should-be-cleared" + + before := time.Now().Unix() + if err := c.signHello(context.Background(), &hello); err != nil { + t.Fatalf("signHello: %v", err) + } + after := time.Now().Unix() + + if hello.PubKeyID != auth.KeyIDForPublicKey(pub) { + t.Errorf("PubKeyID = %q, want %q", hello.PubKeyID, auth.KeyIDForPublicKey(pub)) + } + if hello.TokenHash != "" { + t.Errorf("TokenHash = %q, want cleared when signing", hello.TokenHash) + } + if hello.Nonce == "" { + t.Error("Nonce was not set") + } + if hello.Timestamp < before || hello.Timestamp > after { + t.Errorf("Timestamp = %d, want within [%d,%d]", hello.Timestamp, before, after) + } + + sig, err := base64.RawURLEncoding.DecodeString(hello.Signature) + if err != nil { + t.Fatalf("signature not base64url: %v", err) + } + canonical := auth.CanonicalMessage("GET", "/api/portwing/ws", auth.BodyHashHex(nil), hello.Timestamp, hello.Nonce) + if !ed25519.Verify(pub, canonical, sig) { + t.Error("signature did not verify against the canonical upgrade message") + } +} + +// A missing key file is a hard error the caller falls back from (to token auth). +func TestSignHelloFailsOnMissingKey(t *testing.T) { + t.Parallel() + + c := &Client{cfg: &config.Config{PrivateKeyFile: filepath.Join(t.TempDir(), "absent.key")}} + hello := newHello() + if err := c.signHello(context.Background(), &hello); err == nil { + t.Error("signHello succeeded with a missing key file, want error") + } +} diff --git a/internal/edge/ordered_io_test.go b/internal/edge/ordered_io_test.go new file mode 100644 index 0000000..270cb8f --- /dev/null +++ b/internal/edge/ordered_io_test.go @@ -0,0 +1,193 @@ +package edge + +import ( + "context" + "encoding/base64" + "testing" + + "github.com/codeswhat/portwing/internal/protocol" +) + +// TestOrderedIOEarlyInputBufferedDuringBringUp is the core regression test for +// the ordered-exec-I/O fix. Input that arrives while the Docker exec is still +// being brought up must be buffered in order and delivered once the exec is live, +// not dropped because the session "isn't ready yet." +func TestOrderedIOEarlyInputBufferedDuringBringUp(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + + // startGate holds StartExec open so we can send exec_input before the exec + // connection is live. blockRead keeps the readLoop alive while inputWriter + // drains, so done isn't raced closed before the write completes. + gate := make(chan struct{}) + readBlock := make(chan struct{}) + execConn := &fakeConn{blockRead: readBlock} + fd := &fakeDocker{ + createExecID: "docker-1", + startConn: execConn, + startGate: gate, + } + c.dockerClient = fd + t.Cleanup(func() { close(readBlock) }) // unblock readLoop so it exits cleanly + + runReadPump(t, c) + + // Send exec_start. StartExec registers the session synchronously and spawns + // bringUpExec, which will block on startGate. + sendEnvelope(t, ctrl, protocol.TypeExecStart, protocol.ExecStartMessage{ + ExecID: "ord-1", + ContainerID: "ctr-1", + Cmd: []string{"sh"}, + }) + + // Send exec_input while the exec is still being brought up. The session is + // already registered, so HandleInput can enqueue it immediately. + payload := "echo hi\n" + sendEnvelope(t, ctrl, protocol.TypeExecInput, protocol.ExecInputMessage{ + ExecID: "ord-1", + Data: base64.StdEncoding.EncodeToString([]byte(payload)), + }) + + // Release the gate โ€” bringUpExec can now complete. + close(gate) + + // exec_ready must arrive once the exec is live. + var ready protocol.ExecReadyMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecReady), &ready) + if ready.ExecID != "ord-1" { + t.Errorf("exec_ready ExecID = %q, want ord-1", ready.ExecID) + } + + // The buffered input must have been written to the exec conn in order. + waitFor(t, "early input to be written to exec conn", func() bool { + return string(execConn.written()) == payload + }) +} + +// TestOrderedIOFIFOMultipleEarlyInputs sends three input frames before the exec +// is live and verifies they arrive at the conn concatenated in the original order. +func TestOrderedIOFIFOMultipleEarlyInputs(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + + gate := make(chan struct{}) + // blockRead keeps readLoop alive while inputWriter drains all three chunks so + // done isn't closed mid-drain and the select in inputWriter picks the wrong branch. + readBlock := make(chan struct{}) + execConn := &fakeConn{blockRead: readBlock} + fd := &fakeDocker{ + createExecID: "docker-fifo", + startConn: execConn, + startGate: gate, + } + c.dockerClient = fd + t.Cleanup(func() { close(readBlock) }) + + runReadPump(t, c) + + sendEnvelope(t, ctrl, protocol.TypeExecStart, protocol.ExecStartMessage{ + ExecID: "ord-fifo", + ContainerID: "ctr-fifo", + Cmd: []string{"sh"}, + }) + + // Three frames enqueued before the gate is released. + for _, chunk := range []string{"a", "b", "c"} { + sendEnvelope(t, ctrl, protocol.TypeExecInput, protocol.ExecInputMessage{ + ExecID: "ord-fifo", + Data: base64.StdEncoding.EncodeToString([]byte(chunk)), + }) + } + + close(gate) + + var ready protocol.ExecReadyMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecReady), &ready) + if ready.ExecID != "ord-fifo" { + t.Errorf("exec_ready ExecID = %q, want ord-fifo", ready.ExecID) + } + + waitFor(t, "all three chunks written in order", func() bool { + return string(execConn.written()) == "abc" + }) +} + +// TestOrderedIOInputAfterCloseIsDropped confirms that HandleInput on a closed +// session is a no-op: the conn's written bytes don't grow and no panic occurs. +func TestOrderedIOInputAfterCloseIsDropped(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + // blockRead so the readLoop doesn't interfere with the session state we're testing. + readBlock := make(chan struct{}) + conn := &fakeConn{blockRead: readBlock} + session := newReadySession(c, "ord-closed", conn) + t.Cleanup(func() { close(readBlock) }) + + // Close the session, then try to send more input. + session.Close() + + // written() baseline before the post-close HandleInput. + before := len(conn.written()) + + c.HandleInput(protocol.ExecInputMessage{ + ExecID: "ord-closed", + Data: base64.StdEncoding.EncodeToString([]byte("should not arrive")), + }) + + // Session is deregistered, so HandleInput finds no session and is a no-op. + after := len(conn.written()) + if after != before { + t.Errorf("conn grew by %d bytes after close; want 0", after-before) + } +} + +// TestOrderedIOTeardownDuringBringUpClosesOrphanedConn verifies that if a +// session is torn down while bringUpExec is still in-flight, the hijacked +// Docker conn returned by StartExec is closed immediately by activate and the +// readLoop never starts. Uses StartExec/EndExec directly (bypassing the read +// pump) for deterministic ordering. +func TestOrderedIOTeardownDuringBringUpClosesOrphanedConn(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + + gate := make(chan struct{}) + // orphanConn is the conn that StartExec will return AFTER the gate opens. + // activate must close it because the session is already marked closed. + orphanConn := &fakeConn{} + fd := &fakeDocker{ + createExecID: "docker-orphan", + startConn: orphanConn, + startGate: gate, + } + c.dockerClient = fd + + // StartExec registers the session synchronously and spawns bringUpExec, + // which will block inside fakeDocker.StartExec at the gate. + c.StartExec(context.Background(), protocol.ExecStartMessage{ + ExecID: "ord-orphan", + ContainerID: "ctr-orphan", + Cmd: []string{"sh"}, + }) + + // Tear the session down before releasing the gate. + c.EndExec(protocol.ExecEndMessage{ExecID: "ord-orphan"}) + + // Confirm the session is deregistered (Close() ran its once.Do, which sets + // closed=true before deleting from the map, so activate will see closed=true). + if _, ok := c.execSessions.Load("ord-orphan"); ok { + t.Fatal("session still registered after EndExec; cannot proceed with gate release") + } + + // Release the gate. bringUpExec resumes, gets orphanConn, calls activate. + // activate must detect closed=true and close the conn rather than wire it up. + close(gate) + + // The orphaned conn must be closed by activate. + waitFor(t, "orphaned exec conn to be closed by activate", func() bool { + return orphanConn.isClosed() + }) +} diff --git a/internal/edge/request_test.go b/internal/edge/request_test.go new file mode 100644 index 0000000..9c40984 --- /dev/null +++ b/internal/edge/request_test.go @@ -0,0 +1,126 @@ +package edge + +import ( + "context" + "encoding/base64" + "errors" + "io" + "net/http" + "strings" + "testing" + + "github.com/codeswhat/portwing/internal/protocol" +) + +func mkResp(status int, contentType, body string) *http.Response { + h := http.Header{} + if contentType != "" { + h.Set("Content-Type", contentType) + } + return &http.Response{ + StatusCode: status, + Header: h, + Body: io.NopCloser(strings.NewReader(body)), + } +} + +// A non-streaming request is proxied via Do and returned as a single response +// envelope carrying the status, content type, and body. +func TestHandleRequestNonStream(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + //nolint:bodyclose // the response body is consumed and closed by handleRequest, the code under test. + fd := &fakeDocker{doResp: mkResp(http.StatusCreated, "application/json", `{"ok":true}`)} + c.dockerClient = fd + + c.handleRequest(context.Background(), protocol.RequestMessage{ + RequestID: "r1", + Method: "POST", + Path: "/containers/create", + }) + + var resp protocol.ResponseMessage + decodeData(t, expectType(t, ctrl, protocol.TypeResponse), &resp) + if resp.RequestID != "r1" { + t.Errorf("RequestID = %q, want r1", resp.RequestID) + } + if resp.StatusCode != http.StatusCreated { + t.Errorf("StatusCode = %d, want %d", resp.StatusCode, http.StatusCreated) + } + if resp.IsStream { + t.Error("IsStream = true, want false for a unary request") + } + if string(resp.Body) != `{"ok":true}` { + t.Errorf("Body = %s, want {\"ok\":true}", resp.Body) + } + if resp.ContentType != "application/json" { + t.Errorf("ContentType = %q, want application/json", resp.ContentType) + } +} + +// A request that fails at the Docker client is reported as an error envelope +// tagged with the originating request id. +func TestHandleRequestError(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + fd := &fakeDocker{doErr: errors.New("dial fail")} + c.dockerClient = fd + + c.handleRequest(context.Background(), protocol.RequestMessage{ + RequestID: "r2", + Method: "GET", + Path: "/info", + }) + + var em protocol.ErrorMessage + decodeData(t, expectType(t, ctrl, protocol.TypeError), &em) + if em.RequestID != "r2" { + t.Errorf("error RequestID = %q, want r2", em.RequestID) + } + if em.Message != "dial fail" { + t.Errorf("error Message = %q, want dial fail", em.Message) + } +} + +// A streaming request is proxied via DoStream and tunneled as a stream-header +// response, one or more base64 stream chunks, and a terminal stream_end. +func TestHandleRequestStream(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + //nolint:bodyclose // the response body is consumed and closed by handleRequest, the code under test. + fd := &fakeDocker{streamResp: mkResp(http.StatusOK, "application/octet-stream", "chunk-data")} + c.dockerClient = fd + + c.handleRequest(context.Background(), protocol.RequestMessage{ + RequestID: "r3", + Method: "GET", + Path: "/containers/abc/logs?follow=1", + }) + + var resp protocol.ResponseMessage + decodeData(t, expectType(t, ctrl, protocol.TypeResponse), &resp) + if !resp.IsStream { + t.Error("IsStream = false, want true for a streaming path") + } + if resp.StatusCode != http.StatusOK { + t.Errorf("StatusCode = %d, want 200", resp.StatusCode) + } + + var chunk protocol.StreamMessage + decodeData(t, expectType(t, ctrl, protocol.TypeStream), &chunk) + if chunk.RequestID != "r3" { + t.Errorf("stream RequestID = %q, want r3", chunk.RequestID) + } + if decoded, _ := base64.StdEncoding.DecodeString(chunk.Data); string(decoded) != "chunk-data" { + t.Errorf("stream payload = %q, want chunk-data", decoded) + } + + var end protocol.StreamEndMessage + decodeData(t, expectType(t, ctrl, protocol.TypeStreamEnd), &end) + if end.RequestID != "r3" || end.Reason != "complete" { + t.Errorf("stream_end = %+v, want r3 / complete", end) + } +} diff --git a/internal/edge/tunnel.go b/internal/edge/tunnel.go index 517a5c1..afc46de 100644 --- a/internal/edge/tunnel.go +++ b/internal/edge/tunnel.go @@ -14,18 +14,42 @@ import ( "github.com/codeswhat/portwing/internal/protocol" ) +// execInputQueue bounds the per-session input backlog. Input is decoded on the +// read loop and handed to a single writer goroutine, so this buffers the burst +// that can arrive before the Docker exec is live (and any momentary write +// stall) without ever blocking the read pump. +const execInputQueue = 256 + // ExecSession represents an active exec session tunneled over WebSocket. +// +// Input ordering is the session's core invariant: a single inputWriter +// goroutine drains inbox in arrival order, so keystrokes that race ahead of the +// Docker exec coming up are buffered and replayed in order rather than dropped. type ExecSession struct { execID string containerID string - conn net.Conn client *Client - done chan struct{} - once sync.Once + + // conn is the hijacked Docker exec stream. It is nil until the exec is + // brought up; readers synchronize through connReady (or the mu-guarded + // closed flag during teardown). + conn net.Conn + connReady chan struct{} // closed once conn is live and ordered I/O may flow + + // inbox carries decoded input in arrival order for inputWriter to drain. + inbox chan []byte + + done chan struct{} + once sync.Once + + mu sync.Mutex + closed bool } -// StartExec creates and starts a Docker exec session, then begins streaming -// output back over the WebSocket. +// StartExec registers the exec session synchronously, then brings the Docker +// exec up asynchronously. Registering up front is what makes input ordered: +// exec_input that arrives immediately after exec_start finds the session and is +// queued, instead of racing the bring-up and being dropped. func (c *Client) StartExec(ctx context.Context, msg protocol.ExecStartMessage) { // Check concurrent session limit. var count int @@ -43,15 +67,28 @@ func (c *Client) StartExec(ctx context.Context, msg protocol.ExecStartMessage) { return } + session := &ExecSession{ + execID: msg.ExecID, + containerID: msg.ContainerID, + client: c, + connReady: make(chan struct{}), + inbox: make(chan []byte, execInputQueue), + done: make(chan struct{}), + } + c.execSessions.Store(msg.ExecID, session) + + go session.inputWriter() + go c.bringUpExec(ctx, msg, session) +} + +// bringUpExec performs the Docker round-trips for an already-registered session +// and, on success, wires the live connection and starts streaming. +func (c *Client) bringUpExec(ctx context.Context, msg protocol.ExecStartMessage, session *ExecSession) { // Create exec instance. execID, err := c.dockerClient.CreateExec(ctx, msg.ContainerID, msg.Cmd, msg.User, true) if err != nil { slog.Error("failed to create exec", "container", msg.ContainerID, "error", err) - // Best-effort error reply; connection loss will surface on the read pump. - _ = c.sendTypedMessage(protocol.TypeExecEnd, protocol.ExecEndMessage{ - ExecID: msg.ExecID, - Reason: fmt.Sprintf("create exec failed: %v", err), - }) + session.failStart(fmt.Sprintf("create exec failed: %v", err)) return } @@ -59,11 +96,7 @@ func (c *Client) StartExec(ctx context.Context, msg protocol.ExecStartMessage) { conn, err := c.dockerClient.StartExec(ctx, execID, true) if err != nil { slog.Error("failed to start exec", "execID", execID, "error", err) - // Best-effort error reply; connection loss will surface on the read pump. - _ = c.sendTypedMessage(protocol.TypeExecEnd, protocol.ExecEndMessage{ - ExecID: msg.ExecID, - Reason: fmt.Sprintf("start exec failed: %v", err), - }) + session.failStart(fmt.Sprintf("start exec failed: %v", err)) return } @@ -74,17 +107,13 @@ func (c *Client) StartExec(ctx context.Context, msg protocol.ExecStartMessage) { } } - session := &ExecSession{ - execID: msg.ExecID, - containerID: msg.ContainerID, - conn: conn, - client: c, - done: make(chan struct{}), + // Wire the connection. If the session was already torn down while we were + // bringing the exec up, activate closes the orphaned conn and we stop here. + if !session.activate(conn) { + return } - c.execSessions.Store(msg.ExecID, session) - - // Send exec_ready; best-effort โ€” connection loss will surface on the read pump. + // Announce readiness; best-effort โ€” connection loss surfaces on the read pump. _ = c.sendTypedMessage(protocol.TypeExecReady, protocol.ExecReadyMessage{ ExecID: msg.ExecID, }) @@ -93,7 +122,9 @@ func (c *Client) StartExec(ctx context.Context, msg protocol.ExecStartMessage) { go session.readLoop() } -// HandleInput writes decoded input data to an active exec session. +// HandleInput decodes input and enqueues it for ordered delivery. The enqueue +// is non-blocking: the read pump must keep servicing pings and other sessions, +// so a full queue drops the input with a warning rather than stalling. func (c *Client) HandleInput(msg protocol.ExecInputMessage) { val, ok := c.execSessions.Load(msg.ExecID) if !ok { @@ -109,18 +140,54 @@ func (c *Client) HandleInput(msg protocol.ExecInputMessage) { return } - // Write with retry (up to 10 attempts, 50ms intervals). + select { + case session.inbox <- data: + case <-session.done: + slog.Debug("exec input for closed session", "execID", msg.ExecID) + default: + slog.Warn("exec input queue full, dropping", "execID", msg.ExecID) + } +} + +// inputWriter is the session's single input writer. It waits for the exec to go +// live, then drains inbox in order, writing each chunk to the connection. Being +// the only writer is what guarantees input ordering. +func (s *ExecSession) inputWriter() { + select { + case <-s.connReady: + case <-s.done: + return + } + + for { + select { + case data := <-s.inbox: + s.writeInput(data) + case <-s.done: + return + } + } +} + +// writeInput writes one chunk to the exec connection, retrying transient +// failures (up to 10 attempts, 50ms apart). A session that can't be written to +// is closed. +func (s *ExecSession) writeInput(data []byte) { for attempt := 0; attempt < 10; attempt++ { - _, err := session.conn.Write(data) - if err == nil { + if _, err := s.conn.Write(data); err == nil { return + } else { + slog.Debug("exec write retry", "execID", s.execID, "attempt", attempt+1, "error", err) + } + select { + case <-s.done: + return + case <-time.After(50 * time.Millisecond): } - slog.Debug("exec write retry", "execID", msg.ExecID, "attempt", attempt+1, "error", err) - time.Sleep(50 * time.Millisecond) } - slog.Warn("failed to write exec input after retries", "execID", msg.ExecID) - session.Close() + slog.Warn("failed to write exec input after retries", "execID", s.execID) + s.Close() } // HandleResize changes the TTY dimensions for an active exec session. @@ -159,6 +226,36 @@ func (c *Client) EndExec(msg protocol.ExecEndMessage) { session.Close() } +// activate wires the live connection and unblocks inputWriter. It returns false +// if the session was already closed during bring-up, in which case the caller +// must not start the read loop and activate has closed the orphaned conn. +func (s *ExecSession) activate(conn net.Conn) bool { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + if err := conn.Close(); err != nil { + slog.Debug("closing orphaned exec conn", "exec_id", s.execID, "error", err) + } + return false + } + s.conn = conn + s.mu.Unlock() + + close(s.connReady) + return true +} + +// failStart tears the session down and reports a terminal exec_end. It closes +// first so the session is deregistered before the controller sees the failure. +func (s *ExecSession) failStart(reason string) { + s.Close() + // Best-effort error reply; connection loss will surface on the read pump. + _ = s.client.sendTypedMessage(protocol.TypeExecEnd, protocol.ExecEndMessage{ + ExecID: s.execID, + Reason: reason, + }) +} + // readLoop reads output from the exec session's connection and sends it back // as exec_output messages. On error or EOF, it sends exec_end and cleans up. func (s *ExecSession) readLoop() { @@ -209,11 +306,21 @@ func (s *ExecSession) readLoop() { } } -// Close shuts down the exec session. It is safe to call multiple times. +// Close shuts down the exec session. It is safe to call multiple times and +// safe to race against bring-up: it records the closed state under mu and +// closes whatever connection is currently wired (none, if the exec never went +// live). func (s *ExecSession) Close() { s.once.Do(func() { - if err := s.conn.Close(); err != nil { - slog.Debug("closing exec session", "exec_id", s.execID, "error", err) + s.mu.Lock() + s.closed = true + conn := s.conn + s.mu.Unlock() + + if conn != nil { + if err := conn.Close(); err != nil { + slog.Debug("closing exec session", "exec_id", s.execID, "error", err) + } } close(s.done) s.client.execSessions.Delete(s.execID) diff --git a/internal/edge/tunnel_test.go b/internal/edge/tunnel_test.go new file mode 100644 index 0000000..c69381b --- /dev/null +++ b/internal/edge/tunnel_test.go @@ -0,0 +1,236 @@ +package edge + +import ( + "context" + "encoding/base64" + "errors" + "io" + "strconv" + "testing" + + "github.com/codeswhat/portwing/internal/protocol" +) + +// readLoop should base64-frame each chunk of exec output as an exec_output +// message and, on EOF, emit a terminal exec_end with reason "exited". +func TestReadLoopStreamsOutputThenExecEndOnEOF(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + conn := &fakeConn{reads: [][]byte{[]byte("hello\n")}, readErr: io.EOF} + session := newExecSession(c, "e1", conn) + + go session.readLoop() + + var out protocol.ExecOutputMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecOutput), &out) + if out.ExecID != "e1" { + t.Errorf("exec_output ExecID = %q, want e1", out.ExecID) + } + decoded, err := base64.StdEncoding.DecodeString(out.Data) + if err != nil { + t.Fatalf("exec_output data not base64: %v", err) + } + if string(decoded) != "hello\n" { + t.Errorf("exec_output payload = %q, want %q", decoded, "hello\n") + } + + var end protocol.ExecEndMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecEnd), &end) + if end.ExecID != "e1" { + t.Errorf("exec_end ExecID = %q, want e1", end.ExecID) + } + if end.Reason != "exited" { + t.Errorf("exec_end reason = %q, want exited", end.Reason) + } + + // readLoop's deferred Close must deregister the session. The deferred Close + // runs after readLoop returns (after exec_end is sent), so use waitFor to + // avoid a race between message delivery and the deregistration. + waitFor(t, "session to be deregistered after readLoop", func() bool { + _, ok := c.execSessions.Load("e1") + return !ok + }) +} + +// A non-EOF read error should surface as the exec_end reason verbatim, so the +// controller can distinguish a clean exit from a transport failure. +func TestReadLoopExecEndCarriesErrorReason(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + conn := &fakeConn{readErr: errors.New("connection reset")} + session := newExecSession(c, "e2", conn) + + go session.readLoop() + + var end protocol.ExecEndMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecEnd), &end) + if end.Reason != "connection reset" { + t.Errorf("exec_end reason = %q, want %q", end.Reason, "connection reset") + } +} + +// Close must be idempotent (sync.Once), close the done channel exactly once, +// shut the underlying conn, and deregister the session. +func TestCloseIsIdempotent(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + conn := &fakeConn{} + session := newExecSession(c, "e3", conn) + + session.Close() + session.Close() // must not panic on a second close of done. + + select { + case <-session.done: + default: + t.Error("done channel was not closed") + } + if !conn.closed { + t.Error("underlying conn was not closed") + } + if _, ok := c.execSessions.Load("e3"); ok { + t.Error("session still registered after Close") + } +} + +// HandleInput decodes the base64 payload and writes the raw bytes to the exec +// connection. The write is async (handled by the inputWriter goroutine), so we +// use waitFor to assert the bytes land. +func TestHandleInputWritesDecodedData(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + conn := &fakeConn{} + newReadySession(c, "e4", conn) + + payload := []byte("ls -la\n") + c.HandleInput(protocol.ExecInputMessage{ + ExecID: "e4", + Data: base64.StdEncoding.EncodeToString(payload), + }) + + waitFor(t, "input to be written to conn", func() bool { + return string(conn.written()) == string(payload) + }) +} + +// A malformed base64 payload is dropped without writing anything or closing +// the session. +func TestHandleInputRejectsBadBase64(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + conn := &fakeConn{} + newExecSession(c, "e5", conn) + + c.HandleInput(protocol.ExecInputMessage{ExecID: "e5", Data: "!!!not-base64!!!"}) + + if got := conn.written(); len(got) != 0 { + t.Errorf("wrote %q on bad input, want nothing", got) + } + if _, ok := c.execSessions.Load("e5"); !ok { + t.Error("session was torn down on a decode error; should be left intact") + } +} + +// Input for an unknown exec id is a silent no-op. +func TestHandleInputUnknownSessionNoop(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + // Must not panic with no session registered. + c.HandleInput(protocol.ExecInputMessage{ + ExecID: "ghost", + Data: base64.StdEncoding.EncodeToString([]byte("x")), + }) +} + +// When every write attempt fails, the inputWriter exhausts its retries and +// tears the session down. HandleInput enqueues the data and returns immediately; +// the writer goroutine (started by newReadySession) does the retries async. +func TestHandleInputClosesSessionAfterWriteFailure(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + conn := &fakeConn{writeErr: errors.New("broken pipe")} + session := newReadySession(c, "e6", conn) + + c.HandleInput(protocol.ExecInputMessage{ + ExecID: "e6", + Data: base64.StdEncoding.EncodeToString([]byte("data")), + }) + + // 10 retries ร— 50ms = ~500ms max. waitFor polls until done is closed. + waitFor(t, "session to be closed after write retries exhausted", func() bool { + select { + case <-session.done: + return true + default: + return false + } + }) + if _, ok := c.execSessions.Load("e6"); ok { + t.Error("session still registered after write-failure teardown") + } +} + +// EndExec closes the named session. +func TestEndExecClosesSession(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + conn := &fakeConn{} + session := newExecSession(c, "e7", conn) + + c.EndExec(protocol.ExecEndMessage{ExecID: "e7"}) + + select { + case <-session.done: + default: + t.Error("EndExec did not close the session") + } + if !conn.closed { + t.Error("EndExec did not close the underlying conn") + } +} + +// EndExec / HandleResize for an unknown id must not panic. +func TestEndExecAndResizeUnknownSessionNoop(t *testing.T) { + t.Parallel() + + c, _ := newTestClient(t) + c.EndExec(protocol.ExecEndMessage{ExecID: "ghost"}) + c.HandleResize(context.Background(), protocol.ExecResizeMessage{ExecID: "ghost", Cols: 80, Rows: 24}) +} + +// StartExec must refuse to open a session once maxExecSessions are already +// live, replying with a terminal exec_end rather than creating a Docker exec. +func TestStartExecRejectsBeyondSessionLimit(t *testing.T) { + t.Parallel() + + c, ctrl := newTestClient(t) + + // Saturate the registry. The limit check only counts entries, so bare + // sentinels are enough โ€” no Docker connection is touched on this path. + for i := 0; i < maxExecSessions; i++ { + c.execSessions.Store("limit-"+strconv.Itoa(i), &ExecSession{}) + } + + c.StartExec(context.Background(), protocol.ExecStartMessage{ + ExecID: "overflow", + ContainerID: "c1", + Cmd: []string{"sh"}, + }) + + var end protocol.ExecEndMessage + decodeData(t, expectType(t, ctrl, protocol.TypeExecEnd), &end) + if end.ExecID != "overflow" { + t.Errorf("exec_end ExecID = %q, want overflow", end.ExecID) + } + if end.Reason != "session limit reached" { + t.Errorf("exec_end reason = %q, want %q", end.Reason, "session limit reached") + } +} From 6fdc18e85722ae863003f3b56d9c9a62660c96d2 Mon Sep 17 00:00:00 2001 From: scttbnsn <80784472+scttbnsn@users.noreply.github.com> Date: Tue, 16 Jun 2026 15:12:51 -0400 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=93=9D=20docs:=20mark=20edge=20tunnel?= =?UTF-8?q?=20robustness=20shipped=20in=20ROADMAP=20and=20CHANGELOG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All three sub-items (ordered exec I/O, outbound backpressure, dedicated test harness) have landed, so flip the roadmap line to shipped and record the two fixes plus the harness under [Unreleased]. --- CHANGELOG.md | 6 ++++++ ROADMAP.md | 11 ++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c6d8ae..1359050 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Tier-3 monthly deep fuzz**: `quality-fuzz-monthly.yml` gives each of the five fuzz targets a 1-hour budget on the first of the month (dispatchable to longer budgets before a release), completing the smoke โ†’ nightly โ†’ monthly fuzz tiering. Crash corpora retain for 180 days. - **Weekly soak test**: `quality-soak-weekly.yml` runs the agent (generic adapter) against a mock Docker upstream under a sustained loadgen mix โ€” inventory/version/proxy reads plus SSE subscriber connect/hold/disconnect churn โ€” and fails if its resident set grows past a configurable budget (64 MiB default) over a multi-hour soak. New harness under `benchmarks/cmd/{mockdocker,loadgen}` driven by `scripts/soak.sh`. Catches the long-lived-agent leak profile the unit/integration/fuzz tiers don't. - **Monthly benchmark tracking**: Go benchmarks on the per-request hot paths (auth middleware, Argon2id verify โ€” cold derivation vs. warm SHA-256 cache, client-IP extraction, rate limiter) and the parse paths (PHC, image-ref, Drydock labels, trusted-proxy CIDRs, MCP dispatch). `quality-bench-monthly.yml` reruns them with `-benchmem -count=5` on the first of each month and retains the results for 90 days so a ns/op or allocs/op regression shows up month over month. Completes the test-posture parity with sockguard. +- **Edge tunnel test harness**: a dedicated unit-test harness for the edge WebSocket tunnel โ€” an in-memory controller/agent WebSocket pair plus a consumer-side `dockerAPI` seam so the exec sessions and request fan-out run against a scripted fake Docker daemon with no live socket. Covers hello signing, request dispatch and concurrency rejection, the exec-session lifecycle (start/input/resize/output/end), ordered input replay, and send-path eviction. Lifts `internal/edge` exec/dispatch coverage from effectively zero to ~54%. + +### Fixed + +- **Edge exec input ordering**: `exec_input` that arrived immediately after `exec_start` could be dropped, because the session was only registered after the Docker `CreateExec`/`StartExec` round-trip completed. The session is now registered synchronously up front and early input is buffered in arrival order by a single per-session writer goroutine, then replayed once the exec connection is live โ€” keystrokes typed before the shell comes up are no longer lost or reordered. +- **Edge outbound backpressure**: every sender (exec output, request/stream responses, metrics, pings) previously wrote the WebSocket directly under one mutex with no write deadline, so a single slow or wedged controller could head-of-line-block every session, stall the read pump, and hang the agent indefinitely. Outbound frames now funnel through a single `sendPump` goroutine fronting a bounded queue with a per-frame write deadline; a controller that can't keep up is evicted and reconnected rather than dropping frames (which would hang a request or corrupt a stream). ## [0.3.0] - 2026-06-15 diff --git a/ROADMAP.md b/ROADMAP.md index dc08bd8..229fa63 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -43,9 +43,14 @@ over new surface area. `/api/portwing/ws` controller endpoint (Ed25519-only), so the agent can dial out and manage NAT'd / firewalled hosts with no inbound port. Both Drydock 1.5 and the paired Portwing release are pre-release. -- **Edge tunnel robustness** โ€” ordered exec I/O, backpressure under load, and a - dedicated test harness for the tunnel (auth hello, request fan-out, exec - sessions). Ongoing. +- **Edge tunnel robustness** โ€” *shipped.* Ordered exec I/O (input that races + ahead of the Docker exec coming up is buffered in arrival order and replayed, + not dropped), outbound backpressure (a single writer goroutine fronts a + bounded send queue with a per-frame write deadline and evicts a controller + that can't keep up, so one slow consumer can't head-of-line-block every + session or stall the read pump), and a dedicated unit test harness for the + tunnel (auth hello, request fan-out, exec sessions) built on a consumer-side + Docker seam. - **Reproducible base images** โ€” *shipped.* Both `Dockerfile` and `Dockerfile.release` pin every base image by digest (`wolfi-base`, `alpine`, `golang`), and Dependabot tracks the `docker` ecosystem weekly for updates.