Skip to content

Commit 577a66f

Browse files
committed
feat: Enhance agent orchestration and implement session management
- Added logic to treat manual runs as approval handoffs in AgentOrchestrator. - Refactored LLM timeout handling in ImplementSessionManager to allow configuration. - Introduced methods to manage runner feedback and LLM request completion. - Updated OpenAI responses clients to support threading for retries. - Improved test coverage for agent orchestration and session management, including handling stale feedback and chaining retries. - Adjusted responses client to include previous response IDs for better tracking.
1 parent 5f3edcf commit 577a66f

10 files changed

Lines changed: 838 additions & 133 deletions

ISSUES.md

Lines changed: 283 additions & 60 deletions
Large diffs are not rendered by default.

src/core/agents/agentOrchestrator.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,26 @@ export class AgentOrchestrator {
4040
}
4141

4242
if (current.currentNode !== nodeId) {
43-
await this.runtime.jumpToNode(runId, nodeId, "force", "manual node run");
43+
if (shouldTreatManualRunAsApprovalHandoff(current, nodeId)) {
44+
await this.runtime.approveCurrent(runId, { continueAfterApprove: false });
45+
} else {
46+
await this.runtime.jumpToNode(runId, nodeId, "force", "manual node run");
47+
}
4448
}
4549

50+
51+
function shouldTreatManualRunAsApprovalHandoff(run: RunRecord, nodeId: GraphNodeId): boolean {
52+
const recommendation = run.graph.pendingTransition;
53+
if (!recommendation || recommendation.action !== "pause_for_human") {
54+
return false;
55+
}
56+
if (recommendation.targetNode !== nodeId) {
57+
return false;
58+
}
59+
const currentIdx = GRAPH_NODE_ORDER.indexOf(run.currentNode);
60+
const targetIdx = GRAPH_NODE_ORDER.indexOf(nodeId);
61+
return currentIdx >= 0 && targetIdx === currentIdx + 1;
62+
}
4663
await this.runtime.runUntilPause(runId, {
4764
abortSignal: opts?.abortSignal,
4865
stopAfterApprovalBoundary: true,

src/core/agents/implementSessionManager.ts

Lines changed: 108 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -671,41 +671,22 @@ export class ImplementSessionManager {
671671
if (!this.deps.llm) {
672672
throw new Error("implement_experiments is configured for staged_llm mode, but no LLM client is available.");
673673
}
674-
const llmTimeoutMs = getImplementLlmTimeoutMs();
675-
const timeoutController = new AbortController();
676-
const timeoutId = setTimeout(() => timeoutController.abort(), llmTimeoutMs);
677-
const llmAbortSignal = abortSignal
678-
? AbortSignal.any([abortSignal, timeoutController.signal])
679-
: timeoutController.signal;
680-
try {
681-
const completion = await this.deps.llm.complete(attemptPrompt, {
682-
systemPrompt: attemptSystemPrompt,
683-
abortSignal: llmAbortSignal,
684-
onProgress: (event) => {
685-
const text = event.text.trim();
686-
if (!text) {
687-
return;
688-
}
689-
emitImplementObservation("codex", event.type === "delta" ? `LLM> ${text}` : text, {
690-
attempt,
691-
threadId: activeThreadId,
692-
publicDir: defaultPublicDir
693-
});
694-
}
695-
});
696-
result = {
697-
threadId: activeThreadId,
698-
finalText: completion.text,
699-
events: []
700-
};
701-
} catch (error) {
702-
if (timeoutController.signal.aborted && !abortSignal?.aborted) {
703-
throw new Error(`implement_experiments staged_llm request timed out after ${llmTimeoutMs}ms`);
704-
}
705-
throw error;
706-
} finally {
707-
clearTimeout(timeoutId);
708-
}
674+
const llmTimeoutMs = getImplementLlmTimeoutMs(this.deps.config);
675+
const completion = await this.completeStagedLlmRequest({
676+
prompt: attemptPrompt,
677+
systemPrompt: attemptSystemPrompt,
678+
timeoutMs: llmTimeoutMs,
679+
abortSignal,
680+
attempt,
681+
threadId: activeThreadId,
682+
publicDir: defaultPublicDir,
683+
emitImplementObservation
684+
});
685+
result = {
686+
threadId: completion.threadId || activeThreadId,
687+
finalText: completion.text,
688+
events: []
689+
};
709690
}
710691
} catch (error) {
711692
const errorMessage = error instanceof Error ? error.message : String(error);
@@ -1368,9 +1349,7 @@ export class ImplementSessionManager {
13681349
const previousSummary = await runContext.get<string>("implement_experiments.last_summary");
13691350
const previousRunCommand = await runContext.get<string>("implement_experiments.run_command");
13701351
const previousScript = await runContext.get<string>("implement_experiments.script");
1371-
const runnerFeedback =
1372-
(await runContext.get<RunVerifierReport>("implement_experiments.runner_feedback")) ||
1373-
(await runContext.get<RunVerifierReport>("run_experiments.feedback_for_implementer"));
1352+
const runnerFeedback = await this.loadApplicableRunnerFeedback(run, runContext);
13741353
const paperCritique = await runContext.get<{
13751354
overall_decision?: string;
13761355
manuscript_type?: string;
@@ -1461,6 +1440,33 @@ export class ImplementSessionManager {
14611440
};
14621441
}
14631442

1443+
private async loadApplicableRunnerFeedback(
1444+
run: RunRecord,
1445+
runContext: RunContextMemory
1446+
): Promise<RunVerifierReport | undefined> {
1447+
const runnerFeedback =
1448+
(await runContext.get<RunVerifierReport>("implement_experiments.runner_feedback")) ||
1449+
(await runContext.get<RunVerifierReport>("run_experiments.feedback_for_implementer"));
1450+
if (!runnerFeedback) {
1451+
return undefined;
1452+
}
1453+
if (run.graph.nodeStates.run_experiments?.status === "failed") {
1454+
return runnerFeedback;
1455+
}
1456+
const feedbackRecordedAt = Date.parse(runnerFeedback.recorded_at || "");
1457+
const designUpdatedAt = Date.parse(run.graph.nodeStates.design_experiments?.updatedAt || "");
1458+
if (
1459+
Number.isFinite(feedbackRecordedAt) &&
1460+
Number.isFinite(designUpdatedAt) &&
1461+
designUpdatedAt > feedbackRecordedAt
1462+
) {
1463+
await runContext.put("implement_experiments.runner_feedback", null);
1464+
await runContext.put("run_experiments.feedback_for_implementer", null);
1465+
return undefined;
1466+
}
1467+
return runnerFeedback;
1468+
}
1469+
14641470
private buildAttemptPrompt(params: {
14651471
taskSpec: ImplementTaskSpec;
14661472
searchLocalization: LocalizationResult;
@@ -1618,6 +1624,64 @@ export class ImplementSessionManager {
16181624
return lines.join("\n");
16191625
}
16201626

1627+
private async completeStagedLlmRequest(input: {
1628+
prompt: string;
1629+
systemPrompt: string;
1630+
timeoutMs: number;
1631+
abortSignal?: AbortSignal;
1632+
attempt: number;
1633+
threadId?: string;
1634+
publicDir: string;
1635+
emitImplementObservation: (
1636+
stage: ImplementProgressStage,
1637+
message: string,
1638+
extras?: Partial<ImplementProgressStatus>
1639+
) => void;
1640+
reasoningEffort?: string;
1641+
}): Promise<{ text: string; threadId?: string }> {
1642+
const timeoutController = input.timeoutMs > 0 ? new AbortController() : undefined;
1643+
const timeoutId = timeoutController
1644+
? setTimeout(() => timeoutController.abort(), input.timeoutMs)
1645+
: undefined;
1646+
const llmAbortSignal = timeoutController
1647+
? input.abortSignal
1648+
? AbortSignal.any([input.abortSignal, timeoutController.signal])
1649+
: timeoutController.signal
1650+
: input.abortSignal;
1651+
try {
1652+
const completion = await this.deps.llm!.complete(input.prompt, {
1653+
threadId: input.threadId,
1654+
systemPrompt: input.systemPrompt,
1655+
reasoningEffort: input.reasoningEffort,
1656+
abortSignal: llmAbortSignal,
1657+
onProgress: (event) => {
1658+
const text = event.text.trim();
1659+
if (!text) {
1660+
return;
1661+
}
1662+
input.emitImplementObservation("codex", event.type === "delta" ? `LLM> ${text}` : text, {
1663+
attempt: input.attempt,
1664+
threadId: input.threadId,
1665+
publicDir: input.publicDir
1666+
});
1667+
}
1668+
});
1669+
return {
1670+
text: completion.text,
1671+
threadId: completion.threadId
1672+
};
1673+
} catch (error) {
1674+
if (timeoutController?.signal.aborted && !input.abortSignal?.aborted) {
1675+
throw new Error(`implement_experiments staged_llm request timed out after ${input.timeoutMs}ms`);
1676+
}
1677+
throw error;
1678+
} finally {
1679+
if (timeoutId) {
1680+
clearTimeout(timeoutId);
1681+
}
1682+
}
1683+
}
1684+
16211685
private buildLocalizerInput(
16221686
taskSpec: ImplementTaskSpec,
16231687
previousAttempt: AttemptRecord | undefined,
@@ -2964,9 +3028,13 @@ function stripDryRunFlag(command: string | undefined): string | undefined {
29643028
return stripped || undefined;
29653029
}
29663030

2967-
function getImplementLlmTimeoutMs(): number {
3031+
export function getImplementLlmTimeoutMs(config: AppConfig): number {
29683032
const parsed = Number.parseInt(process.env.AUTOLABOS_IMPLEMENT_LLM_TIMEOUT_MS || "", 10);
2969-
return Number.isFinite(parsed) && parsed > 0 ? parsed : 60_000;
3033+
if (Number.isFinite(parsed) && parsed > 0) {
3034+
return parsed;
3035+
}
3036+
void config;
3037+
return 0;
29703038
}
29713039

29723040
function isDryRunMetricsRepairFeedback(report: RunVerifierReport | undefined): boolean {

src/core/llm/client.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export interface LLMCompletionUsage {
1010

1111
export interface LLMCompletion {
1212
text: string;
13+
threadId?: string;
1314
usage?: LLMCompletionUsage;
1415
}
1516

@@ -22,6 +23,8 @@ export interface LLMCompleteOptions {
2223
threadId?: string;
2324
systemPrompt?: string;
2425
inputImagePaths?: string[];
26+
model?: string;
27+
reasoningEffort?: string;
2528
onProgress?: (event: LLMProgressEvent) => void;
2629
abortSignal?: AbortSignal;
2730
}
@@ -54,8 +57,8 @@ export class CodexLLMClient implements LLMClient {
5457
inputImagePaths: opts?.inputImagePaths,
5558
sandboxMode: "read-only",
5659
approvalPolicy: "never",
57-
model: this.defaults.model,
58-
reasoningEffort: this.defaults.reasoningEffort as never,
60+
model: opts?.model || this.defaults.model,
61+
reasoningEffort: (opts?.reasoningEffort || this.defaults.reasoningEffort) as never,
5962
fastMode: this.defaults.fastMode,
6063
abortSignal: opts?.abortSignal,
6164
onEvent: (event) => {
@@ -66,6 +69,7 @@ export class CodexLLMClient implements LLMClient {
6669

6770
return {
6871
text: result.finalText,
72+
threadId: result.threadId,
6973
usage: {
7074
costUsd: undefined
7175
}
@@ -86,15 +90,17 @@ export class OpenAiResponsesLLMClient implements LLMClient {
8690
opts?.onProgress?.({ type: "status", text: "Submitting request to OpenAI Responses API." });
8791
const text = await this.openai.runForText({
8892
prompt,
93+
threadId: opts?.threadId,
8994
systemPrompt: opts?.systemPrompt,
90-
model: this.defaults.model,
91-
reasoningEffort: this.defaults.reasoningEffort,
95+
model: opts?.model || this.defaults.model,
96+
reasoningEffort: opts?.reasoningEffort || this.defaults.reasoningEffort,
9297
abortSignal: opts?.abortSignal
9398
});
9499
opts?.onProgress?.({ type: "status", text: "Received Responses API output." });
95100

96101
return {
97102
text,
103+
threadId: this.openai.lastResponseId(),
98104
usage: {
99105
costUsd: undefined
100106
}
@@ -116,7 +122,7 @@ export class OllamaLLMClient implements LLMClient {
116122
prompt: string,
117123
opts?: LLMCompleteOptions
118124
): Promise<LLMCompletion> {
119-
const model = this.defaults.model || "qwen3.5:35b-a3b";
125+
const model = opts?.model || this.defaults.model || "qwen3.5:35b-a3b";
120126
opts?.onProgress?.({ type: "status", text: `Submitting request to Ollama (${model}).` });
121127

122128
const hasImages = opts?.inputImagePaths && opts.inputImagePaths.length > 0;

src/core/nodes/implementExperiments.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ export function createImplementExperimentsNode(deps: NodeExecutionDeps): GraphNo
2424
} catch (error) {
2525
if (error instanceof ImplementSessionStopError) {
2626
return {
27-
status: "success",
27+
status: "failure",
2828
summary: error.message,
29-
needsApproval: true,
29+
error: error.message,
3030
toolCallsUsed: 1
3131
};
3232
}

src/integrations/openai/responsesPdfAnalysisClient.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,14 @@ export class ResponsesPdfAnalysisClient {
7474
};
7575
}
7676

77-
// Combine user abort signal with a 10-minute safety timeout
78-
const timeoutMs = 10 * 60 * 1000;
79-
const timeoutController = new AbortController();
80-
const timeoutId = setTimeout(() => timeoutController.abort(), timeoutMs);
81-
let combinedSignal: AbortSignal;
82-
if (args.abortSignal) {
83-
combinedSignal = AbortSignal.any([args.abortSignal, timeoutController.signal]);
84-
} else {
85-
combinedSignal = timeoutController.signal;
86-
}
77+
const timeoutMs = getOpenAiResponsesTimeoutMs();
78+
const timeoutController = timeoutMs > 0 ? new AbortController() : undefined;
79+
const timeoutId = timeoutController ? setTimeout(() => timeoutController.abort(), timeoutMs) : undefined;
80+
const combinedSignal = timeoutController
81+
? args.abortSignal
82+
? AbortSignal.any([args.abortSignal, timeoutController.signal])
83+
: timeoutController.signal
84+
: args.abortSignal;
8785

8886
let response: Response;
8987
try {
@@ -97,7 +95,9 @@ export class ResponsesPdfAnalysisClient {
9795
body: JSON.stringify(body)
9896
});
9997
} finally {
100-
clearTimeout(timeoutId);
98+
if (timeoutId) {
99+
clearTimeout(timeoutId);
100+
}
101101
}
102102

103103
if (!response.ok) {
@@ -126,6 +126,11 @@ export class ResponsesPdfAnalysisClient {
126126
}
127127
}
128128

129+
function getOpenAiResponsesTimeoutMs(): number {
130+
const parsed = Number.parseInt(process.env.AUTOLABOS_OPENAI_RESPONSES_TIMEOUT_MS || "", 10);
131+
return Number.isFinite(parsed) && parsed > 0 ? parsed : 0;
132+
}
133+
129134
function extractOutputText(payload: ResponsesApiResponse): string {
130135
const parts: string[] = [];
131136
for (const output of payload.output ?? []) {

0 commit comments

Comments
 (0)