From 4298da4eaba8a66bba7bbd85f9cdc2ba43580675 Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 10:17:54 -0700 Subject: [PATCH 1/9] feat: model codex subagent workflows --- frontend/src/lib/utils/messages.test.ts | 1 + frontend/src/lib/utils/messages.ts | 1 + internal/parser/codex.go | 214 +++++++++++++++++++++++- internal/parser/codex_parser_test.go | 105 ++++++++++++ internal/parser/taxonomy.go | 2 + internal/testjsonl/testjsonl.go | 38 +++++ 6 files changed, 359 insertions(+), 2 deletions(-) diff --git a/frontend/src/lib/utils/messages.test.ts b/frontend/src/lib/utils/messages.test.ts index 648e3585..0dd78edc 100644 --- a/frontend/src/lib/utils/messages.test.ts +++ b/frontend/src/lib/utils/messages.test.ts @@ -49,6 +49,7 @@ describe("isSystemMessage", () => { ["continuation", "This session is being continued from a previous..."], ["interrupted", "[Request interrupted by user]"], ["task-notification", "done"], + ["subagent-notification", "{\"agent_id\":\"abc\"}"], ["command-message", "commit"], ["command-name", "/commit"], ["local-command", "ok"], diff --git a/frontend/src/lib/utils/messages.ts b/frontend/src/lib/utils/messages.ts index 18d9ba95..85faa124 100644 --- a/frontend/src/lib/utils/messages.ts +++ b/frontend/src/lib/utils/messages.ts @@ -4,6 +4,7 @@ const SYSTEM_MSG_PREFIXES = [ "This session is being continued", "[Request interrupted", "", + "", "", "", " 1 + for agentID, entry := range entries { + text := firstNonEmpty( + entry.Get("completed").Str, + entry.Get("errored").Str, + entry.Get("running").Str, + ) + if text == "" { + continue + } + if !multi { + parts = append(parts, text) + continue + } + label := agentID + if name := strings.TrimSpace(agentNames[agentID]); name != "" { + label = fmt.Sprintf("%s (%s)", name, agentID) + } + parts = append(parts, label+":\n"+text) + } + + return strings.Join(parts, "\n\n") +} + // extractCodexContent joins all text blocks from a Codex // response item's content array. func extractCodexContent(payload gjson.Result) string { @@ -571,6 +733,8 @@ func ParseCodexSession( fmt.Errorf("reading codex %s: %w", path, err) } + annotateSubagentSessions(b.messages, b.subagentMap) + sessionID := b.sessionID if sessionID == "" { sessionID = strings.TrimSuffix( @@ -619,15 +783,23 @@ func ParseCodexSessionFrom( ) ([]ParsedMessage, time.Time, int64, error) { b := newCodexSessionBuilder(includeExec) b.ordinal = startOrdinal + var fallbackErr error consumed, err := readJSONLFrom( path, offset, func(line string) { + if fallbackErr != nil { + return + } // Skip session_meta — already processed in // the initial full parse. if gjson.Get(line, "type").Str == codexTypeSessionMeta { return } + if codexIncrementalNeedsFullParse(line) { + fallbackErr = errCodexIncrementalNeedsFullParse + return + } b.processLine(line) }, ) @@ -637,6 +809,11 @@ func ParseCodexSessionFrom( path, offset, err, ) } + if fallbackErr != nil { + return nil, time.Time{}, 0, fallbackErr + } + + annotateSubagentSessions(b.messages, b.subagentMap) return b.messages, b.endedAt, consumed, nil } @@ -644,5 +821,38 @@ func ParseCodexSessionFrom( func isCodexSystemMessage(content string) bool { return strings.HasPrefix(content, "# AGENTS.md") || strings.HasPrefix(content, "") || - strings.HasPrefix(content, "") + strings.HasPrefix(content, "") || + isCodexSubagentNotification(content) +} + +func isCodexSubagentNotification(content string) bool { + return strings.HasPrefix( + strings.TrimSpace(content), + "", + ) +} + +func codexIncrementalNeedsFullParse(line string) bool { + if gjson.Get(line, "type").Str != codexTypeResponseItem { + return false + } + + payload := gjson.Get(line, "payload") + switch payload.Get("type").Str { + case "function_call_output": + output, _ := parseCodexFunctionOutput(payload) + if !output.Exists() { + return false + } + return strings.TrimSpace(output.Get("agent_id").Str) != "" || + output.Get("status").Exists() + default: + role := payload.Get("role").Str + if role != "user" { + return false + } + return isCodexSubagentNotification( + extractCodexContent(payload), + ) + } } diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index 5670c56f..efe11c15 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -129,6 +129,54 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { assertToolCalls(t, msgs[1].ToolCalls, []ParsedToolCall{{ToolName: "Agent", Category: "Task"}}) }) + t.Run("spawn_agent links child session and wait output becomes tool result", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + waitSummary := "Exit code: `1`\n\nFull output:\n```text\nTraceback...\n```" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + "timeout_ms": 600000, + }, tsLateS5), + testjsonl.CodexFunctionCallOutputJSON("call_wait", "{\"status\":{\""+childID+"\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}}", "2024-01-01T10:01:06Z"), + testjsonl.CodexMsgJSON("user", notification, "2024-01-01T10:01:07Z"), + testjsonl.CodexMsgJSON("assistant", "continuing", "2024-01-01T10:01:08Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 5, len(msgs)) + assert.Equal(t, RoleAssistant, msgs[1].Role) + assertToolCalls(t, msgs[1].ToolCalls, []ParsedToolCall{{ + ToolUseID: "call_spawn", + ToolName: "spawn_agent", + Category: "Task", + SubagentSessionID: "codex:" + childID, + }}) + assert.Equal(t, RoleAssistant, msgs[2].Role) + assertToolCalls(t, msgs[2].ToolCalls, []ParsedToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + }}) + assert.Equal(t, RoleUser, msgs[3].Role) + assert.Empty(t, msgs[3].Content) + require.Len(t, msgs[3].ToolResults, 1) + assert.Equal(t, "call_wait", msgs[3].ToolResults[0].ToolUseID) + assert.Equal(t, waitSummary, DecodeContent(msgs[3].ToolResults[0].ContentRaw)) + assert.Equal(t, RoleAssistant, msgs[4].Role) + assert.Equal(t, "continuing", msgs[4].Content) + }) + t.Run("function call no name skipped", func(t *testing.T) { content := testjsonl.JoinJSONL( testjsonl.CodexSessionMetaJSON("fc-2", "/tmp", "user", tsEarly), @@ -518,3 +566,60 @@ func TestParseCodexSessionFrom_NoNewData(t *testing.T) { assert.Equal(t, 0, len(newMsgs)) assert.True(t, endedAt.IsZero()) } + +func TestParseCodexSessionFrom_SubagentOutputRequiresFullParse(t *testing.T) { + t.Parallel() + + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("inc-sub", "/tmp", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", "run child", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "run it", + }, tsEarlyS5), + ) + path := createTestFile(t, "codex-subagent-inc.jsonl", initial) + + info, err := os.Stat(path) + require.NoError(t, err) + offset := info.Size() + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + _, err = f.WriteString(testjsonl.JoinJSONL( + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"019c9c96-6ee7-77c0-ba4c-380f844289d5","nickname":"Fennel"}`, tsLate), + )) + require.NoError(t, err) + require.NoError(t, f.Close()) + + _, _, _, err = ParseCodexSessionFrom(path, offset, 2, false) + require.Error(t, err) + assert.Contains(t, err.Error(), "full parse") +} + +func TestParseCodexSessionFrom_SystemMessageDoesNotRequireFullParse(t *testing.T) { + t.Parallel() + + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("inc-system", "/tmp", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", "hello", tsEarlyS1), + ) + path := createTestFile(t, "codex-system-inc.jsonl", initial) + + info, err := os.Stat(path) + require.NoError(t, err) + offset := info.Size() + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + _, err = f.WriteString(testjsonl.JoinJSONL( + testjsonl.CodexMsgJSON("user", "# AGENTS.md\nsome instructions", tsLate), + )) + require.NoError(t, err) + require.NoError(t, f.Close()) + + newMsgs, endedAt, _, err := ParseCodexSessionFrom(path, offset, 1, false) + require.NoError(t, err) + assert.Equal(t, 0, len(newMsgs)) + assert.False(t, endedAt.IsZero()) +} diff --git a/internal/parser/taxonomy.go b/internal/parser/taxonomy.go index 657a9d6a..6e0d07c0 100644 --- a/internal/parser/taxonomy.go +++ b/internal/parser/taxonomy.go @@ -31,6 +31,8 @@ func NormalizeToolCategory(rawName string) string { return "Bash" case "apply_patch": return "Edit" + case "spawn_agent": + return "Task" // Gemini tools case "read_file", "list_directory": diff --git a/internal/testjsonl/testjsonl.go b/internal/testjsonl/testjsonl.go index ba1b5fc6..1ace7861 100644 --- a/internal/testjsonl/testjsonl.go +++ b/internal/testjsonl/testjsonl.go @@ -217,6 +217,44 @@ func CodexFunctionCallFieldsJSON( return mustMarshal(m) } +// CodexFunctionCallWithCallIDJSON returns a Codex function_call +// response_item with an explicit call_id. +func CodexFunctionCallWithCallIDJSON( + name, callID string, arguments any, timestamp string, +) string { + payload := map[string]any{ + "type": "function_call", + "name": name, + "call_id": callID, + } + if arguments != nil { + payload["arguments"] = arguments + } + m := map[string]any{ + "type": "response_item", + "timestamp": timestamp, + "payload": payload, + } + return mustMarshal(m) +} + +// CodexFunctionCallOutputJSON returns a Codex +// function_call_output response_item. +func CodexFunctionCallOutputJSON( + callID string, output any, timestamp string, +) string { + m := map[string]any{ + "type": "response_item", + "timestamp": timestamp, + "payload": map[string]any{ + "type": "function_call_output", + "call_id": callID, + "output": output, + }, + } + return mustMarshal(m) +} + // CodexTurnContextJSON returns a Codex turn_context entry as a // JSON string with the given model. func CodexTurnContextJSON(model, timestamp string) string { From b289b7f38b22442027ccd0e333c3a76a7f8da9bf Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 10:23:15 -0700 Subject: [PATCH 2/9] fix: preserve codex notification fallbacks --- internal/parser/codex.go | 89 ++++++++++++++++++++++++++-- internal/parser/codex_parser_test.go | 33 +++++++++++ 2 files changed, 117 insertions(+), 5 deletions(-) diff --git a/internal/parser/codex.go b/internal/parser/codex.go index b461e128..adf980c7 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -39,17 +39,21 @@ type codexSessionBuilder struct { callNames map[string]string subagentMap map[string]string agentNames map[string]string + agentCalls map[string]string + agentResults map[string]bool } func newCodexSessionBuilder( includeExec bool, ) *codexSessionBuilder { return &codexSessionBuilder{ - project: "unknown", - includeExec: includeExec, - callNames: make(map[string]string), - subagentMap: make(map[string]string), - agentNames: make(map[string]string), + project: "unknown", + includeExec: includeExec, + callNames: make(map[string]string), + subagentMap: make(map[string]string), + agentNames: make(map[string]string), + agentCalls: make(map[string]string), + agentResults: make(map[string]bool), } } @@ -128,6 +132,10 @@ func (b *codexSessionBuilder) handleResponseItem( return } + if role == "user" && b.handleSubagentNotification(content, ts) { + return + } + if role == "user" && isCodexSystemMessage(content) { return } @@ -202,6 +210,7 @@ func (b *codexSessionBuilder) handleFunctionCallOutput( return } b.subagentMap[callID] = "codex:" + agentID + b.agentCalls[agentID] = callID if nickname := strings.TrimSpace(output.Get("nickname").Str); nickname != "" { b.agentNames[agentID] = nickname } @@ -210,6 +219,12 @@ func (b *codexSessionBuilder) handleFunctionCallOutput( if text == "" { return } + status := output.Get("status") + if status.Exists() && status.IsObject() { + for agentID := range status.Map() { + b.agentResults[agentID] = true + } + } b.messages = append(b.messages, ParsedMessage{ Ordinal: b.ordinal, Role: RoleUser, @@ -226,6 +241,46 @@ func (b *codexSessionBuilder) handleFunctionCallOutput( } } +func (b *codexSessionBuilder) handleSubagentNotification( + content string, ts time.Time, +) bool { + agentID, text := parseCodexSubagentNotification(content) + if agentID == "" || text == "" { + return false + } + if b.agentResults[agentID] { + return true + } + callID := b.agentCalls[agentID] + if callID == "" { + b.messages = append(b.messages, ParsedMessage{ + Ordinal: b.ordinal, + Role: RoleUser, + Content: text, + Timestamp: ts, + Model: b.currentModel, + ContentLength: len(text), + }) + b.ordinal++ + return true + } + b.agentResults[agentID] = true + b.messages = append(b.messages, ParsedMessage{ + Ordinal: b.ordinal, + Role: RoleUser, + Content: "", + Timestamp: ts, + Model: b.currentModel, + ToolResults: []ParsedToolResult{{ + ToolUseID: callID, + ContentLength: len(text), + ContentRaw: strconv.Quote(text), + }}, + }) + b.ordinal++ + return true +} + func formatCodexFunctionCall( name string, payload gjson.Result, ) string { @@ -677,6 +732,30 @@ func formatCodexWaitOutput( return strings.Join(parts, "\n\n") } +func parseCodexSubagentNotification( + content string, +) (agentID, text string) { + if !isCodexSubagentNotification(content) { + return "", "" + } + body := strings.TrimSpace(content) + body = strings.TrimPrefix(body, "") + body = strings.TrimSuffix(body, "") + body = strings.TrimSpace(body) + if !gjson.Valid(body) { + return "", "" + } + parsed := gjson.Parse(body) + agentID = strings.TrimSpace(parsed.Get("agent_id").Str) + status := parsed.Get("status") + text = firstNonEmpty( + status.Get("completed").Str, + status.Get("errored").Str, + status.Get("running").Str, + ) + return agentID, text +} + // extractCodexContent joins all text blocks from a Codex // response item's content array. func extractCodexContent(payload gjson.Result) string { diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index efe11c15..bdebd03f 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -177,6 +177,39 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { assert.Equal(t, "continuing", msgs[4].Content) }) + t.Run("subagent notification without wait result falls back to spawn_agent output", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + summary := "Exit code: `1`\n\nFull output:\n```text\nTraceback...\n```" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-notify", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", notification, tsLateS5), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 3, len(msgs)) + assertToolCalls(t, msgs[1].ToolCalls, []ParsedToolCall{{ + ToolUseID: "call_spawn", + ToolName: "spawn_agent", + Category: "Task", + SubagentSessionID: "codex:" + childID, + }}) + assert.Equal(t, RoleUser, msgs[2].Role) + assert.Empty(t, msgs[2].Content) + require.Len(t, msgs[2].ToolResults, 1) + assert.Equal(t, "call_spawn", msgs[2].ToolResults[0].ToolUseID) + assert.Equal(t, summary, DecodeContent(msgs[2].ToolResults[0].ContentRaw)) + }) + t.Run("function call no name skipped", func(t *testing.T) { content := testjsonl.JoinJSONL( testjsonl.CodexSessionMetaJSON("fc-2", "/tmp", "user", tsEarly), From d8c0e435dc540df5be228393477478c0675d03a2 Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 10:26:53 -0700 Subject: [PATCH 3/9] fix: ignore nonterminal codex notifications --- internal/parser/codex.go | 15 +++++++------- internal/parser/codex_parser_test.go | 30 ++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/internal/parser/codex.go b/internal/parser/codex.go index adf980c7..faea7545 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -710,11 +710,7 @@ func formatCodexWaitOutput( parts := make([]string, 0, len(entries)) multi := len(entries) > 1 for agentID, entry := range entries { - text := firstNonEmpty( - entry.Get("completed").Str, - entry.Get("errored").Str, - entry.Get("running").Str, - ) + text := codexTerminalSubagentStatus(entry) if text == "" { continue } @@ -748,12 +744,15 @@ func parseCodexSubagentNotification( parsed := gjson.Parse(body) agentID = strings.TrimSpace(parsed.Get("agent_id").Str) status := parsed.Get("status") - text = firstNonEmpty( + text = codexTerminalSubagentStatus(status) + return agentID, text +} + +func codexTerminalSubagentStatus(status gjson.Result) string { + return firstNonEmpty( status.Get("completed").Str, status.Get("errored").Str, - status.Get("running").Str, ) - return agentID, text } // extractCodexContent joins all text blocks from a Codex diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index bdebd03f..ce19d9b7 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -210,6 +210,36 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { assert.Equal(t, summary, DecodeContent(msgs[2].ToolResults[0].ContentRaw)) }) + t.Run("running subagent notification does not suppress later completion", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + running := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"running\":\"Still working\"}}\n" + + "" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-running", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", running, tsLateS5), + testjsonl.CodexMsgJSON("user", completed, "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 3, len(msgs)) + assert.Equal(t, RoleUser, msgs[2].Role) + assert.Empty(t, msgs[2].Content) + require.Len(t, msgs[2].ToolResults, 1) + assert.Equal(t, "call_spawn", msgs[2].ToolResults[0].ToolUseID) + assert.Equal(t, "Finished successfully", DecodeContent(msgs[2].ToolResults[0].ContentRaw)) + }) + t.Run("function call no name skipped", func(t *testing.T) { content := testjsonl.JoinJSONL( testjsonl.CodexSessionMetaJSON("fc-2", "/tmp", "user", tsEarly), From 79dbf9d64b0ae0b4ea80a9bf85520a20a69ba52f Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 10:32:16 -0700 Subject: [PATCH 4/9] fix: trigger resync for codex subagent rollout --- internal/db/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/db/db.go b/internal/db/db.go index 189e4d60..0b07ddb8 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -23,7 +23,7 @@ import ( // formatting changes). Old databases with a lower user_version // trigger a non-destructive re-sync (mtime reset + skip cache // clear) so existing session data is preserved. -const dataVersion = 6 +const dataVersion = 7 //go:embed schema.sql var schemaSQL string From 8856c1bba8b0cdb64185c75d4fee5df7a1e5de67 Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 11:24:00 -0700 Subject: [PATCH 5/9] fix: thread codex subagent results correctly --- internal/parser/codex.go | 42 ++++++++++++++-- internal/parser/codex_parser_test.go | 74 ++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 3 deletions(-) diff --git a/internal/parser/codex.go b/internal/parser/codex.go index faea7545..4fb57d24 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sort" "strconv" "strings" "time" @@ -171,6 +172,12 @@ func (b *codexSessionBuilder) handleFunctionCall( content := formatCodexFunctionCall(name, payload) inputJSON := extractCodexInputJSON(payload) + if name == "wait" && callID != "" { + args, _ := parseCodexFunctionArgs(payload) + for _, agentID := range codexWaitAgentIDs(args) { + b.agentCalls[agentID] = callID + } + } b.messages = append(b.messages, ParsedMessage{ Ordinal: b.ordinal, @@ -221,8 +228,10 @@ func (b *codexSessionBuilder) handleFunctionCallOutput( } status := output.Get("status") if status.Exists() && status.IsObject() { - for agentID := range status.Map() { - b.agentResults[agentID] = true + for agentID, entry := range status.Map() { + if codexTerminalSubagentStatus(entry) != "" { + b.agentResults[agentID] = true + } } } b.messages = append(b.messages, ParsedMessage{ @@ -253,6 +262,7 @@ func (b *codexSessionBuilder) handleSubagentNotification( } callID := b.agentCalls[agentID] if callID == "" { + b.agentResults[agentID] = true b.messages = append(b.messages, ParsedMessage{ Ordinal: b.ordinal, Role: RoleUser, @@ -708,8 +718,14 @@ func formatCodexWaitOutput( } parts := make([]string, 0, len(entries)) + ids := make([]string, 0, len(entries)) + for agentID := range entries { + ids = append(ids, agentID) + } + sort.Strings(ids) multi := len(entries) > 1 - for agentID, entry := range entries { + for _, agentID := range ids { + entry := entries[agentID] text := codexTerminalSubagentStatus(entry) if text == "" { continue @@ -728,6 +744,26 @@ func formatCodexWaitOutput( return strings.Join(parts, "\n\n") } +func codexWaitAgentIDs(args gjson.Result) []string { + if !args.Exists() { + return nil + } + ids := args.Get("ids") + if !ids.Exists() || !ids.IsArray() { + return nil + } + + var out []string + for _, item := range ids.Array() { + id := strings.TrimSpace(item.Str) + if id == "" { + continue + } + out = append(out, id) + } + return out +} + func parseCodexSubagentNotification( content string, ) (agentID, text string) { diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index ce19d9b7..fc6c1628 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -240,6 +240,80 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { assert.Equal(t, "Finished successfully", DecodeContent(msgs[2].ToolResults[0].ContentRaw)) }) + t.Run("notification after wait binds to wait call", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-wait-bind", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, tsLateS5), + testjsonl.CodexMsgJSON("user", completed, "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 4, len(msgs)) + require.Len(t, msgs[3].ToolResults, 1) + assert.Equal(t, "call_wait", msgs[3].ToolResults[0].ToolUseID) + assert.Equal(t, "Finished successfully", DecodeContent(msgs[3].ToolResults[0].ContentRaw)) + }) + + t.Run("mixed wait status preserves later completion for running agent", func(t *testing.T) { + completedID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + runningID := "019c9c96-6ee7-77c0-ba4c-380f844289d6" + laterCompleted := "\n" + + "{\"agent_id\":\"" + runningID + "\",\"status\":{\"completed\":\"Second agent finished\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-mixed-wait", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run child agents", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{completedID, runningID}, + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_wait", + "{\"status\":{\""+completedID+"\":{\"completed\":\"First agent finished\"},\""+runningID+"\":{\"running\":\"Still working\"}}}", + tsLate, + ), + testjsonl.CodexMsgJSON("user", laterCompleted, tsLateS5), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 4, len(msgs)) + require.Len(t, msgs[2].ToolResults, 1) + assert.Equal(t, "call_wait", msgs[2].ToolResults[0].ToolUseID) + assert.Contains(t, DecodeContent(msgs[2].ToolResults[0].ContentRaw), "First agent finished") + require.Len(t, msgs[3].ToolResults, 1) + assert.Equal(t, "call_wait", msgs[3].ToolResults[0].ToolUseID) + assert.Equal(t, "Second agent finished", DecodeContent(msgs[3].ToolResults[0].ContentRaw)) + }) + + t.Run("orphaned terminal notifications dedupe", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-orphan", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", completed, tsEarlyS1), + testjsonl.CodexMsgJSON("user", completed, tsEarlyS5), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 1, len(msgs)) + assert.Equal(t, "Finished successfully", msgs[0].Content) + }) + t.Run("function call no name skipped", func(t *testing.T) { content := testjsonl.JoinJSONL( testjsonl.CodexSessionMetaJSON("fc-2", "/tmp", "user", tsEarly), From a1690427802d0207a12ea0f38ac9db8be11b4bf0 Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 11:54:54 -0700 Subject: [PATCH 6/9] fix: dedupe codex wait and notification results --- internal/parser/codex.go | 27 +++++++++++++----------- internal/parser/codex_parser_test.go | 31 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/internal/parser/codex.go b/internal/parser/codex.go index 4fb57d24..032d8954 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -222,17 +222,14 @@ func (b *codexSessionBuilder) handleFunctionCallOutput( b.agentNames[agentID] = nickname } case "wait": - text := formatCodexWaitOutput(output, b.agentNames) + text, resolved := formatCodexWaitOutput( + output, b.agentNames, b.agentResults, + ) if text == "" { return } - status := output.Get("status") - if status.Exists() && status.IsObject() { - for agentID, entry := range status.Map() { - if codexTerminalSubagentStatus(entry) != "" { - b.agentResults[agentID] = true - } - } + for _, agentID := range resolved { + b.agentResults[agentID] = true } b.messages = append(b.messages, ParsedMessage{ Ordinal: b.ordinal, @@ -706,18 +703,20 @@ func parseCodexFunctionOutput( func formatCodexWaitOutput( output gjson.Result, agentNames map[string]string, -) string { + seen map[string]bool, +) (string, []string) { status := output.Get("status") if !status.Exists() || !status.IsObject() { - return "" + return "", nil } entries := status.Map() if len(entries) == 0 { - return "" + return "", nil } parts := make([]string, 0, len(entries)) + var resolved []string ids := make([]string, 0, len(entries)) for agentID := range entries { ids = append(ids, agentID) @@ -726,10 +725,14 @@ func formatCodexWaitOutput( multi := len(entries) > 1 for _, agentID := range ids { entry := entries[agentID] + if seen[agentID] { + continue + } text := codexTerminalSubagentStatus(entry) if text == "" { continue } + resolved = append(resolved, agentID) if !multi { parts = append(parts, text) continue @@ -741,7 +744,7 @@ func formatCodexWaitOutput( parts = append(parts, label+":\n"+text) } - return strings.Join(parts, "\n\n") + return strings.Join(parts, "\n\n"), resolved } func codexWaitAgentIDs(args gjson.Result) []string { diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index fc6c1628..2d0d8ce0 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -267,6 +267,37 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { assert.Equal(t, "Finished successfully", DecodeContent(msgs[3].ToolResults[0].ContentRaw)) }) + t.Run("wait output does not duplicate terminal notification result", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-wait-dedupe", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, tsLateS5), + testjsonl.CodexMsgJSON("user", completed, "2024-01-01T10:01:06Z"), + testjsonl.CodexFunctionCallOutputJSON("call_wait", + "{\"status\":{\""+childID+"\":{\"completed\":\"Finished successfully\"}}}", + "2024-01-01T10:01:07Z", + ), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 4, len(msgs)) + require.Len(t, msgs[3].ToolResults, 1) + assert.Equal(t, "call_wait", msgs[3].ToolResults[0].ToolUseID) + assert.Equal(t, "Finished successfully", DecodeContent(msgs[3].ToolResults[0].ContentRaw)) + }) + t.Run("mixed wait status preserves later completion for running agent", func(t *testing.T) { completedID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" runningID := "019c9c96-6ee7-77c0-ba4c-380f844289d6" From d2f4db90b899b47f2dab8fda9792deadc9990864 Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 13:00:12 -0700 Subject: [PATCH 7/9] fix: rebind pending codex wait results --- internal/parser/codex.go | 214 +++++++++++++++++---------- internal/parser/codex_parser_test.go | 41 ++++- 2 files changed, 172 insertions(+), 83 deletions(-) diff --git a/internal/parser/codex.go b/internal/parser/codex.go index 032d8954..893e6a75 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -28,33 +28,44 @@ var errCodexIncrementalNeedsFullParse = errors.New( // codexSessionBuilder accumulates state while scanning a Codex // JSONL session file line by line. type codexSessionBuilder struct { - messages []ParsedMessage - firstMessage string - startedAt time.Time - endedAt time.Time - sessionID string - project string - ordinal int - includeExec bool - currentModel string - callNames map[string]string - subagentMap map[string]string - agentNames map[string]string - agentCalls map[string]string - agentResults map[string]bool + messages []ParsedMessage + firstMessage string + startedAt time.Time + endedAt time.Time + sessionID string + project string + ordinal int + includeExec bool + currentModel string + callNames map[string]string + subagentMap map[string]string + agentNames map[string]string + agentCalls map[string]string + agentResults map[string]bool + pendingAgentResults map[string]codexPendingResult + callResults map[string]map[string]codexPendingResult + callResultIndex map[string]int +} + +type codexPendingResult struct { + text string + timestamp time.Time } func newCodexSessionBuilder( includeExec bool, ) *codexSessionBuilder { return &codexSessionBuilder{ - project: "unknown", - includeExec: includeExec, - callNames: make(map[string]string), - subagentMap: make(map[string]string), - agentNames: make(map[string]string), - agentCalls: make(map[string]string), - agentResults: make(map[string]bool), + project: "unknown", + includeExec: includeExec, + callNames: make(map[string]string), + subagentMap: make(map[string]string), + agentNames: make(map[string]string), + agentCalls: make(map[string]string), + agentResults: make(map[string]bool), + pendingAgentResults: make(map[string]codexPendingResult), + callResults: make(map[string]map[string]codexPendingResult), + callResultIndex: make(map[string]int), } } @@ -172,11 +183,10 @@ func (b *codexSessionBuilder) handleFunctionCall( content := formatCodexFunctionCall(name, payload) inputJSON := extractCodexInputJSON(payload) + waitAgentIDs := []string(nil) if name == "wait" && callID != "" { args, _ := parseCodexFunctionArgs(payload) - for _, agentID := range codexWaitAgentIDs(args) { - b.agentCalls[agentID] = callID - } + waitAgentIDs = codexWaitAgentIDs(args) } b.messages = append(b.messages, ParsedMessage{ @@ -195,6 +205,17 @@ func (b *codexSessionBuilder) handleFunctionCall( }}, }) b.ordinal++ + + if name == "wait" && callID != "" { + for _, agentID := range waitAgentIDs { + b.agentCalls[agentID] = callID + if pending, ok := b.pendingAgentResults[agentID]; ok && !b.agentResults[agentID] { + b.setCallResult(callID, agentID, pending.text, pending.timestamp) + b.agentResults[agentID] = true + delete(b.pendingAgentResults, agentID) + } + } + } } func (b *codexSessionBuilder) handleFunctionCallOutput( @@ -222,28 +243,21 @@ func (b *codexSessionBuilder) handleFunctionCallOutput( b.agentNames[agentID] = nickname } case "wait": - text, resolved := formatCodexWaitOutput( - output, b.agentNames, b.agentResults, - ) - if text == "" { + status := output.Get("status") + if !status.Exists() || !status.IsObject() { return } - for _, agentID := range resolved { + for agentID, entry := range status.Map() { + if b.agentResults[agentID] { + continue + } + text := codexTerminalSubagentStatus(entry) + if text == "" { + continue + } + b.setCallResult(callID, agentID, text, ts) b.agentResults[agentID] = true } - b.messages = append(b.messages, ParsedMessage{ - Ordinal: b.ordinal, - Role: RoleUser, - Content: "", - Timestamp: ts, - Model: b.currentModel, - ToolResults: []ParsedToolResult{{ - ToolUseID: callID, - ContentLength: len(text), - ContentRaw: strconv.Quote(text), - }}, - }) - b.ordinal++ } } @@ -258,20 +272,45 @@ func (b *codexSessionBuilder) handleSubagentNotification( return true } callID := b.agentCalls[agentID] - if callID == "" { + if callID != "" && b.callNames[callID] == "wait" { + b.setCallResult(callID, agentID, text, ts) b.agentResults[agentID] = true - b.messages = append(b.messages, ParsedMessage{ - Ordinal: b.ordinal, - Role: RoleUser, - Content: text, - Timestamp: ts, - Model: b.currentModel, - ContentLength: len(text), - }) - b.ordinal++ return true } - b.agentResults[agentID] = true + b.pendingAgentResults[agentID] = codexPendingResult{ + text: text, + timestamp: ts, + } + return true +} + +func (b *codexSessionBuilder) setCallResult( + callID, agentID, text string, ts time.Time, +) { + if callID == "" { + return + } + if b.callResults[callID] == nil { + b.callResults[callID] = make(map[string]codexPendingResult) + } + b.callResults[callID][agentID] = codexPendingResult{ + text: text, + timestamp: ts, + } + + formatted := formatCodexCallResults( + b.callResults[callID], b.agentNames, + ) + if idx, ok := b.callResultIndex[callID]; ok { + tr := &b.messages[idx].ToolResults[0] + tr.ContentLength = len(formatted) + tr.ContentRaw = strconv.Quote(formatted) + if ts.After(b.messages[idx].Timestamp) { + b.messages[idx].Timestamp = ts + } + return + } + b.messages = append(b.messages, ParsedMessage{ Ordinal: b.ordinal, Role: RoleUser, @@ -280,12 +319,45 @@ func (b *codexSessionBuilder) handleSubagentNotification( Model: b.currentModel, ToolResults: []ParsedToolResult{{ ToolUseID: callID, - ContentLength: len(text), - ContentRaw: strconv.Quote(text), + ContentLength: len(formatted), + ContentRaw: strconv.Quote(formatted), }}, }) + b.callResultIndex[callID] = len(b.messages) - 1 b.ordinal++ - return true +} + +func (b *codexSessionBuilder) flushPendingAgentResults() { + if len(b.pendingAgentResults) == 0 { + return + } + agentIDs := make([]string, 0, len(b.pendingAgentResults)) + for agentID := range b.pendingAgentResults { + agentIDs = append(agentIDs, agentID) + } + sort.Strings(agentIDs) + + for _, agentID := range agentIDs { + if b.agentResults[agentID] { + continue + } + pending := b.pendingAgentResults[agentID] + callID := b.agentCalls[agentID] + if callID != "" { + b.setCallResult(callID, agentID, pending.text, pending.timestamp) + } else { + b.messages = append(b.messages, ParsedMessage{ + Ordinal: b.ordinal, + Role: RoleUser, + Content: pending.text, + Timestamp: pending.timestamp, + Model: b.currentModel, + ContentLength: len(pending.text), + }) + b.ordinal++ + } + b.agentResults[agentID] = true + } } func formatCodexFunctionCall( @@ -700,23 +772,15 @@ func parseCodexFunctionOutput( } } -func formatCodexWaitOutput( - output gjson.Result, +func formatCodexCallResults( + entries map[string]codexPendingResult, agentNames map[string]string, - seen map[string]bool, -) (string, []string) { - status := output.Get("status") - if !status.Exists() || !status.IsObject() { - return "", nil - } - - entries := status.Map() +) string { if len(entries) == 0 { - return "", nil + return "" } parts := make([]string, 0, len(entries)) - var resolved []string ids := make([]string, 0, len(entries)) for agentID := range entries { ids = append(ids, agentID) @@ -724,15 +788,7 @@ func formatCodexWaitOutput( sort.Strings(ids) multi := len(entries) > 1 for _, agentID := range ids { - entry := entries[agentID] - if seen[agentID] { - continue - } - text := codexTerminalSubagentStatus(entry) - if text == "" { - continue - } - resolved = append(resolved, agentID) + text := entries[agentID].text if !multi { parts = append(parts, text) continue @@ -744,7 +800,7 @@ func formatCodexWaitOutput( parts = append(parts, label+":\n"+text) } - return strings.Join(parts, "\n\n"), resolved + return strings.Join(parts, "\n\n") } func codexWaitAgentIDs(args gjson.Result) []string { @@ -850,6 +906,7 @@ func ParseCodexSession( fmt.Errorf("reading codex %s: %w", path, err) } + b.flushPendingAgentResults() annotateSubagentSessions(b.messages, b.subagentMap) sessionID := b.sessionID @@ -930,6 +987,7 @@ func ParseCodexSessionFrom( return nil, time.Time{}, 0, fallbackErr } + b.flushPendingAgentResults() annotateSubagentSessions(b.messages, b.subagentMap) return b.messages, b.endedAt, consumed, nil diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index 2d0d8ce0..3eea4967 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -260,6 +260,38 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { ) sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + require.NotNil(t, sess) + assert.Equal(t, 4, len(msgs)) + assertToolCalls(t, msgs[2].ToolCalls, []ParsedToolCall{{ + ToolUseID: "call_wait", + ToolName: "wait", + Category: "Other", + }}) + require.Len(t, msgs[3].ToolResults, 1) + assert.Equal(t, "call_wait", msgs[3].ToolResults[0].ToolUseID) + assert.Equal(t, "Finished successfully", DecodeContent(msgs[3].ToolResults[0].ContentRaw)) + }) + + t.Run("notification before wait binds to later wait call", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + completed := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-wait-rebind", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", completed, tsLateS5), + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + require.NotNil(t, sess) assert.Equal(t, 4, len(msgs)) require.Len(t, msgs[3].ToolResults, 1) @@ -319,13 +351,12 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) require.NotNil(t, sess) - assert.Equal(t, 4, len(msgs)) + assert.Equal(t, 3, len(msgs)) require.Len(t, msgs[2].ToolResults, 1) assert.Equal(t, "call_wait", msgs[2].ToolResults[0].ToolUseID) - assert.Contains(t, DecodeContent(msgs[2].ToolResults[0].ContentRaw), "First agent finished") - require.Len(t, msgs[3].ToolResults, 1) - assert.Equal(t, "call_wait", msgs[3].ToolResults[0].ToolUseID) - assert.Equal(t, "Second agent finished", DecodeContent(msgs[3].ToolResults[0].ContentRaw)) + decoded := DecodeContent(msgs[2].ToolResults[0].ContentRaw) + assert.Contains(t, decoded, "First agent finished") + assert.Contains(t, decoded, "Second agent finished") }) t.Run("orphaned terminal notifications dedupe", func(t *testing.T) { From 63bae3b616618ecc4bf676f74603981b2a24ce9a Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 14:51:54 -0700 Subject: [PATCH 8/9] fix: preserve codex subagent chronology --- internal/parser/codex.go | 63 +++++++++++++++++++++++----- internal/parser/codex_parser_test.go | 28 +++++++++++++ 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/internal/parser/codex.go b/internal/parser/codex.go index 893e6a75..76e7453e 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -50,6 +50,7 @@ type codexSessionBuilder struct { type codexPendingResult struct { text string timestamp time.Time + ordinal int } func newCodexSessionBuilder( @@ -280,12 +281,20 @@ func (b *codexSessionBuilder) handleSubagentNotification( b.pendingAgentResults[agentID] = codexPendingResult{ text: text, timestamp: ts, + ordinal: b.ordinal, } + b.ordinal++ return true } func (b *codexSessionBuilder) setCallResult( callID, agentID, text string, ts time.Time, +) { + b.setCallResultAt(callID, agentID, text, ts, -1) +} + +func (b *codexSessionBuilder) setCallResultAt( + callID, agentID, text string, ts time.Time, ordinal int, ) { if callID == "" { return @@ -311,8 +320,13 @@ func (b *codexSessionBuilder) setCallResult( return } - b.messages = append(b.messages, ParsedMessage{ - Ordinal: b.ordinal, + if ordinal < 0 { + ordinal = b.ordinal + b.ordinal++ + } + + msg := ParsedMessage{ + Ordinal: ordinal, Role: RoleUser, Content: "", Timestamp: ts, @@ -322,9 +336,9 @@ func (b *codexSessionBuilder) setCallResult( ContentLength: len(formatted), ContentRaw: strconv.Quote(formatted), }}, - }) - b.callResultIndex[callID] = len(b.messages) - 1 - b.ordinal++ + } + idx := b.insertMessage(msg) + b.callResultIndex[callID] = idx } func (b *codexSessionBuilder) flushPendingAgentResults() { @@ -335,7 +349,14 @@ func (b *codexSessionBuilder) flushPendingAgentResults() { for agentID := range b.pendingAgentResults { agentIDs = append(agentIDs, agentID) } - sort.Strings(agentIDs) + sort.Slice(agentIDs, func(i, j int) bool { + pi := b.pendingAgentResults[agentIDs[i]] + pj := b.pendingAgentResults[agentIDs[j]] + if pi.ordinal == pj.ordinal { + return agentIDs[i] < agentIDs[j] + } + return pi.ordinal < pj.ordinal + }) for _, agentID := range agentIDs { if b.agentResults[agentID] { @@ -344,22 +365,44 @@ func (b *codexSessionBuilder) flushPendingAgentResults() { pending := b.pendingAgentResults[agentID] callID := b.agentCalls[agentID] if callID != "" { - b.setCallResult(callID, agentID, pending.text, pending.timestamp) + b.setCallResultAt( + callID, agentID, + pending.text, pending.timestamp, + pending.ordinal, + ) } else { - b.messages = append(b.messages, ParsedMessage{ - Ordinal: b.ordinal, + b.insertMessage(ParsedMessage{ + Ordinal: pending.ordinal, Role: RoleUser, Content: pending.text, Timestamp: pending.timestamp, Model: b.currentModel, ContentLength: len(pending.text), }) - b.ordinal++ } b.agentResults[agentID] = true } } +func (b *codexSessionBuilder) insertMessage(msg ParsedMessage) int { + idx := len(b.messages) + for i, existing := range b.messages { + if existing.Ordinal > msg.Ordinal { + idx = i + break + } + } + b.messages = append(b.messages, ParsedMessage{}) + copy(b.messages[idx+1:], b.messages[idx:]) + b.messages[idx] = msg + for callID, cur := range b.callResultIndex { + if cur >= idx { + b.callResultIndex[callID] = cur + 1 + } + } + return idx +} + func formatCodexFunctionCall( name string, payload gjson.Result, ) string { diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index 3eea4967..492a1c1d 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -210,6 +210,34 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { assert.Equal(t, summary, DecodeContent(msgs[2].ToolResults[0].ContentRaw)) }) + t.Run("no-wait fallback preserves chronology before later messages", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + summary := "Exit code: `1`\n\nFull output:\n```text\nTraceback...\n```" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-notify-order", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", notification, tsLateS5), + testjsonl.CodexMsgJSON("assistant", "continuing", "2024-01-01T10:01:06Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 4, len(msgs)) + require.Len(t, msgs[2].ToolResults, 1) + assert.Equal(t, "call_spawn", msgs[2].ToolResults[0].ToolUseID) + assert.Equal(t, summary, DecodeContent(msgs[2].ToolResults[0].ContentRaw)) + assert.Equal(t, RoleAssistant, msgs[3].Role) + assert.Equal(t, "continuing", msgs[3].Content) + }) + t.Run("running subagent notification does not suppress later completion", func(t *testing.T) { childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" running := "\n" + From 63dc0563d40bedb864f35a1cee4d11f153f738de Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Thu, 26 Mar 2026 15:42:04 -0700 Subject: [PATCH 9/9] fix: reparse late codex wait bindings --- internal/parser/codex.go | 5 +++ internal/parser/codex_parser_test.go | 67 ++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/internal/parser/codex.go b/internal/parser/codex.go index 76e7453e..9725b226 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -278,6 +278,9 @@ func (b *codexSessionBuilder) handleSubagentNotification( b.agentResults[agentID] = true return true } + if _, ok := b.pendingAgentResults[agentID]; ok { + return true + } b.pendingAgentResults[agentID] = codexPendingResult{ text: text, timestamp: ts, @@ -1057,6 +1060,8 @@ func codexIncrementalNeedsFullParse(line string) bool { payload := gjson.Get(line, "payload") switch payload.Get("type").Str { + case "function_call": + return payload.Get("name").Str == "wait" case "function_call_output": output, _ := parseCodexFunctionOutput(payload) if !output.Exists() { diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index 492a1c1d..cd6c0451 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -238,6 +238,35 @@ func TestParseCodexSession_FunctionCalls(t *testing.T) { assert.Equal(t, "continuing", msgs[3].Content) }) + t.Run("duplicate pending notification preserves earliest chronology", func(t *testing.T) { + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + summary := "Exit code: `1`\n\nFull output:\n```text\nTraceback...\n```" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Exit code: `1`\\n\\nFull output:\\n```text\\nTraceback...\\n```\"}}\n" + + "" + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("fc-subagent-notify-dupe-order", "/tmp", "user", tsEarly), + testjsonl.CodexMsgJSON("user", "run a child agent", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "Run the compile smoke test", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", notification, tsLateS5), + testjsonl.CodexMsgJSON("assistant", "continuing", "2024-01-01T10:01:06Z"), + testjsonl.CodexMsgJSON("user", notification, "2024-01-01T10:01:07Z"), + ) + sess, msgs := runCodexParserTest(t, "test.jsonl", content, false) + + require.NotNil(t, sess) + assert.Equal(t, 4, len(msgs)) + require.Len(t, msgs[2].ToolResults, 1) + assert.Equal(t, "call_spawn", msgs[2].ToolResults[0].ToolUseID) + assert.Equal(t, summary, DecodeContent(msgs[2].ToolResults[0].ContentRaw)) + assert.Equal(t, RoleAssistant, msgs[3].Role) + assert.Equal(t, "continuing", msgs[3].Content) + }) + t.Run("running subagent notification does not suppress later completion", func(t *testing.T) { childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" running := "\n" + @@ -824,6 +853,44 @@ func TestParseCodexSessionFrom_SubagentOutputRequiresFullParse(t *testing.T) { assert.Contains(t, err.Error(), "full parse") } +func TestParseCodexSessionFrom_WaitCallRequiresFullParse(t *testing.T) { + t.Parallel() + + childID := "019c9c96-6ee7-77c0-ba4c-380f844289d5" + notification := "\n" + + "{\"agent_id\":\"" + childID + "\",\"status\":{\"completed\":\"Finished successfully\"}}\n" + + "" + initial := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON("inc-wait", "/tmp", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", "run child", tsEarlyS1), + testjsonl.CodexFunctionCallWithCallIDJSON("spawn_agent", "call_spawn", map[string]any{ + "agent_type": "awaiter", + "message": "run it", + }, tsEarlyS5), + testjsonl.CodexFunctionCallOutputJSON("call_spawn", `{"agent_id":"`+childID+`","nickname":"Fennel"}`, tsLate), + testjsonl.CodexMsgJSON("user", notification, tsLateS5), + ) + path := createTestFile(t, "codex-wait-inc.jsonl", initial) + + info, err := os.Stat(path) + require.NoError(t, err) + offset := info.Size() + + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + _, err = f.WriteString(testjsonl.JoinJSONL( + testjsonl.CodexFunctionCallWithCallIDJSON("wait", "call_wait", map[string]any{ + "ids": []string{childID}, + }, "2024-01-01T10:01:06Z"), + )) + require.NoError(t, err) + require.NoError(t, f.Close()) + + _, _, _, err = ParseCodexSessionFrom(path, offset, 4, false) + require.Error(t, err) + assert.Contains(t, err.Error(), "full parse") +} + func TestParseCodexSessionFrom_SystemMessageDoesNotRequireFullParse(t *testing.T) { t.Parallel()