Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions cli/commands/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,17 @@ func printActivityEvent(data string) {
}

// streamSSE issues a GET that expects a text/event-stream response. The
// returned http.Response is left open — the caller reads resp.Body.
// returned http.Response is left open — the caller reads resp.Body. Routed
// through client.Send so every SSE consumer shares one hardened path: no
// total-duration cap (long-lived streams survive) but a 45s idle deadline so
// a stream that goes silent after the headers can't hang the CLI forever.
func streamSSE(client *cli.Client, path string) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, client.BaseURL+path, nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "text/event-stream")
if client.APIKey != "" {
req.Header.Set("X-Orva-API-Key", client.APIKey)
}
// SSE streams are long-lived; bypass the client's default 120s timeout
// by using a fresh client with no timeout for streaming reads.
streamingClient := &http.Client{Timeout: 0}
resp, err := streamingClient.Do(req)
resp, err := client.Send(cli.Request{
Path: path,
Accept: "text/event-stream",
NoTimeout: true,
IdleTimeout: cli.DefaultStreamIdleTimeout,
})
if err != nil {
return nil, err
}
Expand Down
12 changes: 11 additions & 1 deletion cli/commands/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (s *chatSession) postChat(ctx context.Context, content string) (*http.Respo
Accept: "text/event-stream",
ContentType: "application/json",
NoTimeout: true,
IdleTimeout: cli.DefaultStreamIdleTimeout,
Ctx: ctx,
Body: bytes.NewReader(j),
})
Expand Down Expand Up @@ -617,6 +618,14 @@ func (s *chatSession) drive(resp *http.Response) (turnResult, error) {
}
return false, nil
})
// A clean EOF with no terminal frame (done / awaiting_approval / error)
// means the stream was cut mid-turn — the server or provider dropped the
// connection. Surface it as an error instead of silently accepting a
// truncated turn as success. Guarded on err==nil so it never masks a
// Ctrl-C (context.Canceled) or a real transport error.
if err == nil && !res.done && !res.awaiting && res.errMsg == "" {
return res, errors.New("chat stream ended unexpectedly — the server or provider dropped the connection (no completion received)")
}
return res, err
}

Expand Down Expand Up @@ -691,7 +700,8 @@ func (s *chatSession) handleApprovals(ctx context.Context, pending []pendingTool
}
path := "/api/v1/ai/tool-calls/" + url.PathEscape(tc.ID) + "/" + verb
resp, err := s.client.Send(cli.Request{
Method: http.MethodPost, Path: path, Accept: "text/event-stream", NoTimeout: true, Ctx: ctx,
Method: http.MethodPost, Path: path, Accept: "text/event-stream",
NoTimeout: true, IdleTimeout: cli.DefaultStreamIdleTimeout, Ctx: ctx,
})
if err != nil {
return last, err
Expand Down
62 changes: 62 additions & 0 deletions cli/commands/chat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/Harsh-2002/Orva/cli/commands/theme"
cli "github.com/Harsh-2002/Orva/internal/client"
Expand Down Expand Up @@ -81,6 +82,67 @@ func TestDriveSSE(t *testing.T) {
}
}

// TestDrivePrematureEOF asserts the PR-C guard: a stream that ends (clean EOF)
// without a terminal frame (done / awaiting_approval / error) is surfaced as an
// error, not silently accepted as a successful (truncated) turn.
func TestDrivePrematureEOF(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
sseFrame(w, "message_start", `{"message_id":"m1","role":"assistant"}`)
sseFrame(w, "delta", `{"text":"partial answer"}`)
// Handler returns here — connection closes mid-turn, NO done/error frame.
}))
defer srv.Close()

s, _, _ := newTestSession(cli.NewClient(srv.URL, "k"))
resp, err := s.postChat(context.Background(), "hi")
if err != nil {
t.Fatalf("postChat: %v", err)
}
_, err = s.drive(resp)
if err == nil {
t.Fatal("expected an error for a stream that ended without a terminal frame, got nil")
}
if !strings.Contains(err.Error(), "ended unexpectedly") {
t.Errorf("error = %v, want it to mention the stream ending unexpectedly", err)
}
}

// TestClientIdleTimeout asserts the PR-C client idle deadline: a stream that
// goes silent after the headers is cancelled within the idle window instead of
// hanging forever. Reading blocks until the idle timer cancels the request ctx.
func TestClientIdleTimeout(t *testing.T) {
release := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
fmt.Fprint(w, ": ping\n\n")
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
<-release // stall: never send more, never close, until the test ends
}))
defer srv.Close()
defer close(release)

c := cli.NewClient(srv.URL, "k")
resp, err := c.Send(cli.Request{Path: "/", NoTimeout: true, IdleTimeout: 200 * time.Millisecond})
if err != nil {
t.Fatalf("send: %v", err)
}
defer resp.Body.Close()

start := time.Now()
buf := make([]byte, 4096)
for {
if _, rerr := resp.Body.Read(buf); rerr != nil {
break // idle timer cancelled the ctx → Read unblocks with an error
}
}
if elapsed := time.Since(start); elapsed > 3*time.Second {
t.Errorf("read did not unblock within the idle window; took %v (idle was 200ms)", elapsed)
}
}

// TestApprovalFailClosedNonTTY ensures a tool requiring approval, in a
// non-interactive context without --auto-approve, fails closed and never issues
// an approve/reject POST.
Expand Down
12 changes: 11 additions & 1 deletion cli/commands/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,21 @@ func followDeploymentLogs(cmd *cobra.Command, client *cli.Client, id string) err

infof(cmd, "following build log for %s — Ctrl-C to stop", id)

terminal := false
if err := consumeSSE(resp, func(event, data string) (bool, error) {
return handleStreamFrame(cmd, event, data)
stop, ferr := handleStreamFrame(cmd, event, data)
if stop {
terminal = true
}
return stop, ferr
}); err != nil {
return fmt.Errorf("follow: %w", err)
}
// A clean EOF with no terminal event means the build stream was cut before
// reporting a result — don't pass it off as success (mirrors watchBuild).
if !terminal {
return fmt.Errorf("build stream ended before a result was reported; check `orva deployments get %s`", id)
}
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions cli/commands/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func runInvoke(cmd *cobra.Command, args []string) error {
Accept: "*/*",
NoTimeout: stream,
}
if stream {
// No total cap on a streamed invocation, but an idle deadline so a
// stalled stream can't hang the CLI forever.
req.IdleTimeout = cli.DefaultStreamIdleTimeout
}
if len(body) > 0 {
req.Body = bytes.NewReader(body)
}
Expand Down
71 changes: 69 additions & 2 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ func NewClient(baseURL, apiKey string) *Client {
}
}

// DefaultStreamIdleTimeout is the recommended IdleTimeout for SSE/streaming
// requests. The server emits a heartbeat every 15s, so 45s (three missed
// heartbeats) means "the connection has genuinely gone silent" — enough to
// break a hung stream without tripping on a momentary pause. It is NOT a
// total-duration cap: the timer resets on every byte received, so a healthy
// long-lived stream runs indefinitely.
const DefaultStreamIdleTimeout = 45 * time.Second

// Request is a flexible HTTP request descriptor for callers that need more
// control than the Get/Post/Put/Delete helpers give — custom method, raw
// body, extra headers, query string, or unbounded streaming. The simple
Expand All @@ -42,6 +50,7 @@ type Request struct {
Accept string // Accept header; defaults to application/json
Headers map[string]string // extra headers (override the above)
NoTimeout bool // skip the 120s client timeout (for streaming)
IdleTimeout time.Duration // if >0, cancel when no bytes arrive for this long (streaming)
Ctx context.Context // optional request context (for cancellation)
}

Expand All @@ -62,8 +71,19 @@ func (c *Client) Send(r Request) (*http.Response, error) {
if ctx == nil {
ctx = context.Background()
}
// An idle deadline cancels the request context when no bytes arrive for
// IdleTimeout, so a stream that goes silent after the headers can't hang
// the caller forever. Derived before building the request so the request
// carries the cancellable context.
var idleCancel context.CancelFunc
if r.IdleTimeout > 0 {
ctx, idleCancel = context.WithCancel(ctx)
}
req, err := http.NewRequestWithContext(ctx, method, u, r.Body)
if err != nil {
if idleCancel != nil {
idleCancel()
}
return nil, fmt.Errorf("create request: %w", err)
}

Expand Down Expand Up @@ -91,11 +111,50 @@ func (c *Client) Send(r Request) (*http.Response, error) {

resp, err := httpClient.Do(req)
if err != nil {
if idleCancel != nil {
idleCancel()
}
return nil, fmt.Errorf("request failed: %w", err)
}
if idleCancel != nil {
resp.Body = newIdleReadCloser(resp.Body, r.IdleTimeout, idleCancel)
}
return resp, nil
}

// idleReadCloser wraps a streaming response body with a per-read idle timer.
// The timer is reset before each Read (so it spans the blocking wait); if no
// data arrives within idle, it fires the request-context cancel, which unblocks
// the stuck Read with a context error. Close stops the timer and cancels (so a
// caller that stops reading early releases the context). Reset-on-every-byte
// means it is an idle timeout, never a total-duration cap.
type idleReadCloser struct {
rc io.ReadCloser
idle time.Duration
timer *time.Timer
cancel context.CancelFunc
}

func newIdleReadCloser(rc io.ReadCloser, idle time.Duration, cancel context.CancelFunc) *idleReadCloser {
return &idleReadCloser{
rc: rc,
idle: idle,
timer: time.AfterFunc(idle, cancel),
cancel: cancel,
}
}

func (i *idleReadCloser) Read(p []byte) (int, error) {
i.timer.Reset(i.idle)
return i.rc.Read(p)
}

func (i *idleReadCloser) Close() error {
i.timer.Stop()
i.cancel()
return i.rc.Close()
}

// Do sends an HTTP request with an optional JSON-encoded body.
func (c *Client) Do(method, path string, body any) (*http.Response, error) {
var r Request
Expand Down Expand Up @@ -138,7 +197,15 @@ func (c *Client) Delete(path string) (*http.Response, error) {
}

// Stream issues a GET that is expected to stream (SSE, chunked) and returns
// the live response with no client-side timeout. The caller owns resp.Body.
// the live response. No total-duration cap (NoTimeout), but an idle deadline
// guards against a stream that goes silent after the headers. The caller owns
// resp.Body.
func (c *Client) Stream(path string, query url.Values) (*http.Response, error) {
return c.Send(Request{Path: path, Query: query, Accept: "text/event-stream", NoTimeout: true})
return c.Send(Request{
Path: path,
Query: query,
Accept: "text/event-stream",
NoTimeout: true,
IdleTimeout: DefaultStreamIdleTimeout,
})
}
Loading