Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Tier-3 monthly deep fuzz**: `quality-fuzz-monthly.yml` gives each of the five fuzz targets a 1-hour budget on the first of the month (dispatchable to longer budgets before a release), completing the smoke → nightly → monthly fuzz tiering. Crash corpora retain for 180 days.
- **Weekly soak test**: `quality-soak-weekly.yml` runs the agent (generic adapter) against a mock Docker upstream under a sustained loadgen mix — inventory/version/proxy reads plus SSE subscriber connect/hold/disconnect churn — and fails if its resident set grows past a configurable budget (64 MiB default) over a multi-hour soak. New harness under `benchmarks/cmd/{mockdocker,loadgen}` driven by `scripts/soak.sh`. Catches the long-lived-agent leak profile the unit/integration/fuzz tiers don't.
- **Monthly benchmark tracking**: Go benchmarks on the per-request hot paths (auth middleware, Argon2id verify — cold derivation vs. warm SHA-256 cache, client-IP extraction, rate limiter) and the parse paths (PHC, image-ref, Drydock labels, trusted-proxy CIDRs, MCP dispatch). `quality-bench-monthly.yml` reruns them with `-benchmem -count=5` on the first of each month and retains the results for 90 days so a ns/op or allocs/op regression shows up month over month. Completes the test-posture parity with sockguard.
- **Edge tunnel test harness**: a dedicated unit-test harness for the edge WebSocket tunnel — an in-memory controller/agent WebSocket pair plus a consumer-side `dockerAPI` seam so the exec sessions and request fan-out run against a scripted fake Docker daemon with no live socket. Covers hello signing, request dispatch and concurrency rejection, the exec-session lifecycle (start/input/resize/output/end), ordered input replay, and send-path eviction. Lifts `internal/edge` exec/dispatch coverage from effectively zero to ~54%.

### Fixed

- **Edge exec input ordering**: `exec_input` that arrived immediately after `exec_start` could be dropped, because the session was only registered after the Docker `CreateExec`/`StartExec` round-trip completed. The session is now registered synchronously up front and early input is buffered in arrival order by a single per-session writer goroutine, then replayed once the exec connection is live — keystrokes typed before the shell comes up are no longer lost or reordered.
- **Edge outbound backpressure**: every sender (exec output, request/stream responses, metrics, pings) previously wrote the WebSocket directly under one mutex with no write deadline, so a single slow or wedged controller could head-of-line-block every session, stall the read pump, and hang the agent indefinitely. Outbound frames now funnel through a single `sendPump` goroutine fronting a bounded queue with a per-frame write deadline; a controller that can't keep up is evicted and reconnected rather than dropping frames (which would hang a request or corrupt a stream).

## [0.3.0] - 2026-06-15

Expand Down
11 changes: 8 additions & 3 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ over new surface area.
`/api/portwing/ws` controller endpoint (Ed25519-only), so the agent can dial
out and manage NAT'd / firewalled hosts with no inbound port. Both Drydock 1.5
and the paired Portwing release are pre-release.
- **Edge tunnel robustness** — ordered exec I/O, backpressure under load, and a
dedicated test harness for the tunnel (auth hello, request fan-out, exec
sessions). Ongoing.
- **Edge tunnel robustness** — *shipped.* Ordered exec I/O (input that races
ahead of the Docker exec coming up is buffered in arrival order and replayed,
not dropped), outbound backpressure (a single writer goroutine fronts a
bounded send queue with a per-frame write deadline and evicts a controller
that can't keep up, so one slow consumer can't head-of-line-block every
session or stall the read pump), and a dedicated unit test harness for the
tunnel (auth hello, request fan-out, exec sessions) built on a consumer-side
Docker seam.
- **Reproducible base images** — *shipped.* Both `Dockerfile` and
`Dockerfile.release` pin every base image by digest (`wolfi-base`, `alpine`,
`golang`), and Dependabot tracks the `docker` ecosystem weekly for updates.
Expand Down
143 changes: 143 additions & 0 deletions internal/edge/backpressure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package edge

import (
"context"
"testing"
"time"

"github.com/gorilla/websocket"

"github.com/codeswhat/portwing/internal/protocol"
)

// runSendPump creates the per-connection send queue and starts the sendPump
// against the test client, returning the channel so a test can observe/fill it.
// The pump is torn down via context cancellation registered as a test cleanup.
func runSendPump(t *testing.T, c *Client) chan protocol.Envelope {
t.Helper()
ch := make(chan protocol.Envelope, sendQueueSize)
c.connMu.Lock()
c.sendCh = ch
conn := c.conn
c.connMu.Unlock()

ctx, cancel := context.WithCancel(context.Background())
go c.sendPump(ctx, conn, ch)
t.Cleanup(cancel)
return ch
}

// TestSendPumpDeliversQueuedFrame proves that the queued send path (sendCh set)
// delivers a frame end-to-end: sendTypedMessage enqueues the envelope, the
// sendPump dequeues and writes it over the WebSocket, and the controller reads
// back the exact content.
func TestSendPumpDeliversQueuedFrame(t *testing.T) {
t.Parallel()

c, ctrl := newTestClient(t)
runSendPump(t, c)

const ts = int64(42)
if err := c.sendTypedMessage(protocol.TypePong, protocol.PongMessage{Timestamp: ts}); err != nil {
t.Fatalf("sendTypedMessage: %v", err)
}

var pong protocol.PongMessage
decodeData(t, expectType(t, ctrl, protocol.TypePong), &pong)
if pong.Timestamp != ts {
t.Errorf("pong.Timestamp = %d, want %d", pong.Timestamp, ts)
}
}

// TestSendMessageEvictsConnectionWhenQueueFull pins the core backpressure
// invariant: when sendCh is full and no pump is draining it, the next
// sendMessage call takes the default branch and calls failConn, which closes
// the agent-side WebSocket. The controller observes the close as a read error,
// proving eviction rather than silent frame drop or deadlock.
func TestSendMessageEvictsConnectionWhenQueueFull(t *testing.T) {
t.Parallel()

c, ctrl := newTestClient(t)

// Install a capacity-1 queue (no pump running — nobody drains it).
c.connMu.Lock()
c.sendCh = make(chan protocol.Envelope, 1)
c.connMu.Unlock()

// Fill the queue to capacity so the next send hits the default branch.
c.sendCh <- protocol.Envelope{Type: protocol.TypePing}

// This send must not block; it must call failConn and close the agent conn.
c.sendMessage(protocol.Envelope{Type: protocol.TypePing})

// The controller should see the connection torn down. Give it up to
// readTimeout to propagate.
if err := ctrl.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
t.Fatalf("set ctrl read deadline: %v", err)
}
_, _, err := ctrl.ReadMessage()
if err == nil {
t.Fatal("expected read error after eviction, got nil")
}
}

// TestSendPumpEvictsOnWriteFailure verifies that a write error inside the
// sendPump causes failConn to be called and the agent-side connection to be
// closed. We induce the write failure by closing the controller end first; the
// sendPump's WriteJSON then fails, which must trigger failConn. The test
// confirms eviction by waiting until the agent conn itself becomes unusable.
func TestSendPumpEvictsOnWriteFailure(t *testing.T) {
t.Parallel()

c, ctrl := newTestClient(t)
runSendPump(t, c)

// Capture the agent conn before eviction so we can probe it afterwards.
c.connMu.Lock()
agentConn := c.conn
c.connMu.Unlock()

// Closing the controller makes every subsequent WriteJSON from the agent
// fail because the peer has gone away.
if err := ctrl.Close(); err != nil {
t.Fatalf("close ctrl: %v", err)
}

// Enqueue a frame; the sendPump will try to write it and fail.
if err := c.sendTypedMessage(protocol.TypePong, protocol.PongMessage{Timestamp: 1}); err != nil {
t.Fatalf("sendTypedMessage: %v", err)
}

// Wait until failConn has propagated and the agent connection is unusable.
waitFor(t, "agent conn evicted", func() bool {
err := agentConn.WriteControl(
websocket.PingMessage,
nil,
time.Now().Add(10*time.Millisecond),
)
return err != nil
})
}

// TestSendMessageDirectWriteWhenNoQueue documents that the handshake (nil
// sendCh) code path remains intact: with sendCh left nil (as newTestClient
// always leaves it), sendTypedMessage writes directly to the WebSocket and the
// controller receives the frame. Every existing dispatch test relies on this
// behaviour implicitly; this test makes it an explicit contract.
func TestSendMessageDirectWriteWhenNoQueue(t *testing.T) {
t.Parallel()

c, ctrl := newTestClient(t)
// sendCh is nil — newTestClient does not set it.

const ts = int64(7)
if err := c.sendTypedMessage(protocol.TypePong, protocol.PongMessage{Timestamp: ts}); err != nil {
t.Fatalf("sendTypedMessage: %v", err)
}

var pong protocol.PongMessage
decodeData(t, expectType(t, ctrl, protocol.TypePong), &pong)
if pong.Timestamp != ts {
t.Errorf("pong.Timestamp = %d, want %d", pong.Timestamp, ts)
}
}
129 changes: 118 additions & 11 deletions internal/edge/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"log/slog"
"math/big"
"net"
"net/http"
"os"
"strings"
Expand All @@ -35,8 +36,30 @@ const (
maxResponseBody = 100 * 1024 * 1024 // 100 MB — proxied response body cap
maxExecSessions = 100 // concurrent exec sessions
maxStreams = 100 // concurrent in-flight tunneled requests

// sendQueueSize bounds outbound frames buffered for the sendPump. A full
// queue means the controller can't keep up, so the connection is evicted
// (slow-consumer backpressure) rather than letting the backlog grow.
sendQueueSize = 256
// writeWait bounds a single WebSocket write. A controller that can't accept
// a frame within this window is treated as wedged and the connection is
// evicted, instead of blocking the writer forever.
writeWait = 10 * time.Second
)

// dockerAPI is the subset of *docker.Client the edge Client depends on. It is
// defined on the consumer side so the tunnel's exec sessions and the request
// fan-out can be exercised against a fake Docker daemon without a live socket.
// *docker.Client satisfies it structurally.
type dockerAPI interface {
GetVersion(ctx context.Context) (string, error)
Do(ctx context.Context, method, path string, body io.Reader) (*http.Response, error)
DoStream(ctx context.Context, method, path string, body io.Reader) (*http.Response, error)
CreateExec(ctx context.Context, containerID string, cmd []string, user string, tty bool) (string, error)
StartExec(ctx context.Context, execID string, tty bool) (net.Conn, error)
ResizeExec(ctx context.Context, execID string, cols, rows int) error
}

// edgeMessageSender wraps the edge Client to implement adapter.MessageSender.
type edgeMessageSender struct {
client *Client
Expand All @@ -50,7 +73,7 @@ func (s *edgeMessageSender) SendTypedMessage(msgType string, data interface{}) e
// and tunnels Docker API requests over the WebSocket.
type Client struct {
cfg *config.Config
dockerClient *docker.Client
dockerClient dockerAPI
adapter adapter.EdgeAdapter
compose *docker.ComposeManager
collector *metrics.Collector
Expand All @@ -59,6 +82,13 @@ type Client struct {
conn *websocket.Conn
connMu sync.Mutex

// sendCh fronts all post-handshake writes with a single sendPump goroutine,
// so a slow controller backs up here instead of head-of-line-blocking every
// sender or stalling the read pump. It is nil outside an active connection;
// the hello/welcome handshake writes the conn directly (no concurrent
// writer exists yet). Guarded by connMu alongside conn.
sendCh chan protocol.Envelope

execSessions sync.Map

// streamSem bounds concurrent in-flight request handlers (maxStreams).
Expand Down Expand Up @@ -230,6 +260,25 @@ func (c *Client) connect(ctx context.Context) (bool, error) {

slog.Info("connected to controller")

pumpCtx, pumpCancel := context.WithCancel(ctx)
defer pumpCancel()

var wg sync.WaitGroup

// Bring the outbound send path up before any post-handshake send, so the
// adapter sync, metrics, and every pump funnel through the single sendPump
// (the only writer) instead of writing the conn concurrently.
sendCh := make(chan protocol.Envelope, sendQueueSize)
c.connMu.Lock()
c.sendCh = sendCh
c.connMu.Unlock()

wg.Add(1)
go func() {
defer wg.Done()
c.sendPump(pumpCtx, conn, sendCh)
}()

// Let adapter handle initial sync (container sync, component sync, etc.).
sender := &edgeMessageSender{client: c}
if err := c.adapter.OnConnect(ctx, sender); err != nil {
Expand All @@ -240,10 +289,6 @@ func (c *Client) connect(ctx context.Context) (bool, error) {
c.sendMetrics()

// Run pumps.
pumpCtx, pumpCancel := context.WithCancel(ctx)
defer pumpCancel()

var wg sync.WaitGroup
wg.Add(2)

var readErr error
Expand All @@ -266,6 +311,7 @@ func (c *Client) connect(ctx context.Context) (bool, error) {
closeWebSocket(c.conn, "connection loop end")
c.conn = nil
}
c.sendCh = nil
c.connMu.Unlock()

// Reaching here means the welcome handshake succeeded, so the connection
Expand Down Expand Up @@ -421,7 +467,11 @@ func (c *Client) readPump(ctx context.Context) error {
continue
}
c.auditor.ExecStart(c.cfg.DrydockURL, msg.ContainerID, msg.ExecID)
go c.StartExec(ctx, msg)
// Synchronous: StartExec only registers the session and spawns the
// Docker bring-up, so it returns immediately. Registering before the
// next message is dispatched is what keeps a following exec_input
// from racing the bring-up and being dropped (ordered exec I/O).
c.StartExec(ctx, msg)

case protocol.TypeExecInput:
var msg protocol.ExecInputMessage
Expand Down Expand Up @@ -621,17 +671,74 @@ func (c *Client) sendTypedMessage(msgType string, data interface{}) error {
return nil
}

// sendMessage performs a thread-safe WebSocket write.
// sendMessage hands an envelope to the sendPump. The enqueue is non-blocking:
// sendMessage runs on the read-pump goroutine for pongs and rejections, so it
// must never block. A full queue means the controller can't keep up — the
// connection is evicted (and Run reconnects) rather than dropping frames, which
// would hang a request or corrupt a stream.
//
// Before the send path is up (the hello/welcome handshake), sendCh is nil and
// the frame is written directly — no concurrent writer exists yet.
func (c *Client) sendMessage(env protocol.Envelope) {
c.connMu.Lock()
defer c.connMu.Unlock()
ch := c.sendCh
conn := c.conn
c.connMu.Unlock()

if c.conn == nil {
if ch == nil {
// Handshake phase: synchronous direct write, provably single-writer.
if conn == nil {
return
}
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := conn.WriteJSON(env); err != nil {
slog.Warn("websocket write failed", "type", env.Type, "error", err)
}
return
}

if err := c.conn.WriteJSON(env); err != nil {
slog.Warn("websocket write failed", "type", env.Type, "error", err)
select {
case ch <- env:
default:
c.failConn("send queue full")
}
}

// sendPump is the sole writer to the WebSocket once a connection is up.
// Fronting every send with one goroutine and a bounded queue is the tunnel's
// outbound backpressure: a slow controller backs up sendCh instead of
// head-of-line-blocking every sender or stalling the read pump, and a write
// that can't complete within writeWait evicts the connection rather than
// blocking forever.
func (c *Client) sendPump(ctx context.Context, conn *websocket.Conn, sendCh chan protocol.Envelope) {
for {
select {
case <-ctx.Done():
return
case env := <-sendCh:
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
c.failConn("set write deadline failed")
return
}
if err := conn.WriteJSON(env); err != nil {
slog.Warn("websocket write failed", "type", env.Type, "error", err)
c.failConn("write failed")
return
}
}
}
}

// failConn evicts the active WebSocket. Closing it unblocks the read pump with
// an error, which tears the pumps down and lets Run reconnect with backoff.
// Safe to call from any goroutine and more than once.
func (c *Client) failConn(reason string) {
c.connMu.Lock()
conn := c.conn
c.connMu.Unlock()
if conn != nil {
slog.Warn("evicting controller connection", "reason", reason)
closeWebSocket(conn, reason)
}
}

Expand Down
Loading
Loading