diff --git a/cli/commands/activity.go b/cli/commands/activity.go index bed3d88..07c1f56 100644 --- a/cli/commands/activity.go +++ b/cli/commands/activity.go @@ -154,20 +154,17 @@ func printActivityEvent(data string) { } // streamSSE issues a GET that expects a text/event-stream response. The -// returned http.Response is left open — the caller reads resp.Body. +// returned http.Response is left open — the caller reads resp.Body. Routed +// through client.Send so every SSE consumer shares one hardened path: no +// total-duration cap (long-lived streams survive) but a 45s idle deadline so +// a stream that goes silent after the headers can't hang the CLI forever. func streamSSE(client *cli.Client, path string) (*http.Response, error) { - req, err := http.NewRequest(http.MethodGet, client.BaseURL+path, nil) - if err != nil { - return nil, err - } - req.Header.Set("Accept", "text/event-stream") - if client.APIKey != "" { - req.Header.Set("X-Orva-API-Key", client.APIKey) - } - // SSE streams are long-lived; bypass the client's default 120s timeout - // by using a fresh client with no timeout for streaming reads. - streamingClient := &http.Client{Timeout: 0} - resp, err := streamingClient.Do(req) + resp, err := client.Send(cli.Request{ + Path: path, + Accept: "text/event-stream", + NoTimeout: true, + IdleTimeout: cli.DefaultStreamIdleTimeout, + }) if err != nil { return nil, err } diff --git a/cli/commands/chat.go b/cli/commands/chat.go index 3f7b2ae..4ee33b8 100644 --- a/cli/commands/chat.go +++ b/cli/commands/chat.go @@ -494,6 +494,7 @@ func (s *chatSession) postChat(ctx context.Context, content string) (*http.Respo Accept: "text/event-stream", ContentType: "application/json", NoTimeout: true, + IdleTimeout: cli.DefaultStreamIdleTimeout, Ctx: ctx, Body: bytes.NewReader(j), }) @@ -617,6 +618,14 @@ func (s *chatSession) drive(resp *http.Response) (turnResult, error) { } return false, nil }) + // A clean EOF with no terminal frame (done / awaiting_approval / error) + // means the stream was cut mid-turn — the server or provider dropped the + // connection. Surface it as an error instead of silently accepting a + // truncated turn as success. Guarded on err==nil so it never masks a + // Ctrl-C (context.Canceled) or a real transport error. + if err == nil && !res.done && !res.awaiting && res.errMsg == "" { + return res, errors.New("chat stream ended unexpectedly — the server or provider dropped the connection (no completion received)") + } return res, err } @@ -691,7 +700,8 @@ func (s *chatSession) handleApprovals(ctx context.Context, pending []pendingTool } path := "/api/v1/ai/tool-calls/" + url.PathEscape(tc.ID) + "/" + verb resp, err := s.client.Send(cli.Request{ - Method: http.MethodPost, Path: path, Accept: "text/event-stream", NoTimeout: true, Ctx: ctx, + Method: http.MethodPost, Path: path, Accept: "text/event-stream", + NoTimeout: true, IdleTimeout: cli.DefaultStreamIdleTimeout, Ctx: ctx, }) if err != nil { return last, err diff --git a/cli/commands/chat_test.go b/cli/commands/chat_test.go index dd9a365..174bf4d 100644 --- a/cli/commands/chat_test.go +++ b/cli/commands/chat_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/Harsh-2002/Orva/cli/commands/theme" cli "github.com/Harsh-2002/Orva/internal/client" @@ -81,6 +82,67 @@ func TestDriveSSE(t *testing.T) { } } +// TestDrivePrematureEOF asserts the PR-C guard: a stream that ends (clean EOF) +// without a terminal frame (done / awaiting_approval / error) is surfaced as an +// error, not silently accepted as a successful (truncated) turn. +func TestDrivePrematureEOF(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + sseFrame(w, "message_start", `{"message_id":"m1","role":"assistant"}`) + sseFrame(w, "delta", `{"text":"partial answer"}`) + // Handler returns here — connection closes mid-turn, NO done/error frame. + })) + defer srv.Close() + + s, _, _ := newTestSession(cli.NewClient(srv.URL, "k")) + resp, err := s.postChat(context.Background(), "hi") + if err != nil { + t.Fatalf("postChat: %v", err) + } + _, err = s.drive(resp) + if err == nil { + t.Fatal("expected an error for a stream that ended without a terminal frame, got nil") + } + if !strings.Contains(err.Error(), "ended unexpectedly") { + t.Errorf("error = %v, want it to mention the stream ending unexpectedly", err) + } +} + +// TestClientIdleTimeout asserts the PR-C client idle deadline: a stream that +// goes silent after the headers is cancelled within the idle window instead of +// hanging forever. Reading blocks until the idle timer cancels the request ctx. +func TestClientIdleTimeout(t *testing.T) { + release := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + fmt.Fprint(w, ": ping\n\n") + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + <-release // stall: never send more, never close, until the test ends + })) + defer srv.Close() + defer close(release) + + c := cli.NewClient(srv.URL, "k") + resp, err := c.Send(cli.Request{Path: "/", NoTimeout: true, IdleTimeout: 200 * time.Millisecond}) + if err != nil { + t.Fatalf("send: %v", err) + } + defer resp.Body.Close() + + start := time.Now() + buf := make([]byte, 4096) + for { + if _, rerr := resp.Body.Read(buf); rerr != nil { + break // idle timer cancelled the ctx → Read unblocks with an error + } + } + if elapsed := time.Since(start); elapsed > 3*time.Second { + t.Errorf("read did not unblock within the idle window; took %v (idle was 200ms)", elapsed) + } +} + // TestApprovalFailClosedNonTTY ensures a tool requiring approval, in a // non-interactive context without --auto-approve, fails closed and never issues // an approve/reject POST. diff --git a/cli/commands/deployments.go b/cli/commands/deployments.go index f83dd93..66ac84f 100644 --- a/cli/commands/deployments.go +++ b/cli/commands/deployments.go @@ -292,11 +292,21 @@ func followDeploymentLogs(cmd *cobra.Command, client *cli.Client, id string) err infof(cmd, "following build log for %s — Ctrl-C to stop", id) + terminal := false if err := consumeSSE(resp, func(event, data string) (bool, error) { - return handleStreamFrame(cmd, event, data) + stop, ferr := handleStreamFrame(cmd, event, data) + if stop { + terminal = true + } + return stop, ferr }); err != nil { return fmt.Errorf("follow: %w", err) } + // A clean EOF with no terminal event means the build stream was cut before + // reporting a result — don't pass it off as success (mirrors watchBuild). + if !terminal { + return fmt.Errorf("build stream ended before a result was reported; check `orva deployments get %s`", id) + } return nil } diff --git a/cli/commands/invoke.go b/cli/commands/invoke.go index 5b32b8b..22fff16 100644 --- a/cli/commands/invoke.go +++ b/cli/commands/invoke.go @@ -131,6 +131,11 @@ func runInvoke(cmd *cobra.Command, args []string) error { Accept: "*/*", NoTimeout: stream, } + if stream { + // No total cap on a streamed invocation, but an idle deadline so a + // stalled stream can't hang the CLI forever. + req.IdleTimeout = cli.DefaultStreamIdleTimeout + } if len(body) > 0 { req.Body = bytes.NewReader(body) } diff --git a/internal/client/client.go b/internal/client/client.go index 50aaab4..8898f11 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -29,6 +29,14 @@ func NewClient(baseURL, apiKey string) *Client { } } +// DefaultStreamIdleTimeout is the recommended IdleTimeout for SSE/streaming +// requests. The server emits a heartbeat every 15s, so 45s (three missed +// heartbeats) means "the connection has genuinely gone silent" — enough to +// break a hung stream without tripping on a momentary pause. It is NOT a +// total-duration cap: the timer resets on every byte received, so a healthy +// long-lived stream runs indefinitely. +const DefaultStreamIdleTimeout = 45 * time.Second + // Request is a flexible HTTP request descriptor for callers that need more // control than the Get/Post/Put/Delete helpers give — custom method, raw // body, extra headers, query string, or unbounded streaming. The simple @@ -42,6 +50,7 @@ type Request struct { Accept string // Accept header; defaults to application/json Headers map[string]string // extra headers (override the above) NoTimeout bool // skip the 120s client timeout (for streaming) + IdleTimeout time.Duration // if >0, cancel when no bytes arrive for this long (streaming) Ctx context.Context // optional request context (for cancellation) } @@ -62,8 +71,19 @@ func (c *Client) Send(r Request) (*http.Response, error) { if ctx == nil { ctx = context.Background() } + // An idle deadline cancels the request context when no bytes arrive for + // IdleTimeout, so a stream that goes silent after the headers can't hang + // the caller forever. Derived before building the request so the request + // carries the cancellable context. + var idleCancel context.CancelFunc + if r.IdleTimeout > 0 { + ctx, idleCancel = context.WithCancel(ctx) + } req, err := http.NewRequestWithContext(ctx, method, u, r.Body) if err != nil { + if idleCancel != nil { + idleCancel() + } return nil, fmt.Errorf("create request: %w", err) } @@ -91,11 +111,50 @@ func (c *Client) Send(r Request) (*http.Response, error) { resp, err := httpClient.Do(req) if err != nil { + if idleCancel != nil { + idleCancel() + } return nil, fmt.Errorf("request failed: %w", err) } + if idleCancel != nil { + resp.Body = newIdleReadCloser(resp.Body, r.IdleTimeout, idleCancel) + } return resp, nil } +// idleReadCloser wraps a streaming response body with a per-read idle timer. +// The timer is reset before each Read (so it spans the blocking wait); if no +// data arrives within idle, it fires the request-context cancel, which unblocks +// the stuck Read with a context error. Close stops the timer and cancels (so a +// caller that stops reading early releases the context). Reset-on-every-byte +// means it is an idle timeout, never a total-duration cap. +type idleReadCloser struct { + rc io.ReadCloser + idle time.Duration + timer *time.Timer + cancel context.CancelFunc +} + +func newIdleReadCloser(rc io.ReadCloser, idle time.Duration, cancel context.CancelFunc) *idleReadCloser { + return &idleReadCloser{ + rc: rc, + idle: idle, + timer: time.AfterFunc(idle, cancel), + cancel: cancel, + } +} + +func (i *idleReadCloser) Read(p []byte) (int, error) { + i.timer.Reset(i.idle) + return i.rc.Read(p) +} + +func (i *idleReadCloser) Close() error { + i.timer.Stop() + i.cancel() + return i.rc.Close() +} + // Do sends an HTTP request with an optional JSON-encoded body. func (c *Client) Do(method, path string, body any) (*http.Response, error) { var r Request @@ -138,7 +197,15 @@ func (c *Client) Delete(path string) (*http.Response, error) { } // Stream issues a GET that is expected to stream (SSE, chunked) and returns -// the live response with no client-side timeout. The caller owns resp.Body. +// the live response. No total-duration cap (NoTimeout), but an idle deadline +// guards against a stream that goes silent after the headers. The caller owns +// resp.Body. func (c *Client) Stream(path string, query url.Values) (*http.Response, error) { - return c.Send(Request{Path: path, Query: query, Accept: "text/event-stream", NoTimeout: true}) + return c.Send(Request{ + Path: path, + Query: query, + Accept: "text/event-stream", + NoTimeout: true, + IdleTimeout: DefaultStreamIdleTimeout, + }) }