Skip to content

Commit bfb6d6a

Browse files
committed
Fix JSONL trace exporter review issues
1 parent bb4f3c2 commit bfb6d6a

7 files changed

Lines changed: 333 additions & 148 deletions

File tree

agentscope-core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,5 +140,11 @@
140140
<groupId>com.networknt</groupId>
141141
<artifactId>json-schema-validator</artifactId>
142142
</dependency>
143+
144+
<dependency>
145+
<groupId>io.opentelemetry</groupId>
146+
<artifactId>opentelemetry-api</artifactId>
147+
<scope>test</scope>
148+
</dependency>
143149
</dependencies>
144150
</project>

agentscope-core/src/main/java/io/agentscope/core/hook/recorder/JsonlTraceExporter.java

Lines changed: 185 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.io.IOException;
3737
import java.io.PrintWriter;
3838
import java.io.StringWriter;
39+
import java.lang.reflect.Method;
3940
import java.nio.charset.StandardCharsets;
4041
import java.nio.file.Files;
4142
import java.nio.file.OpenOption;
@@ -47,12 +48,21 @@
4748
import java.util.Objects;
4849
import java.util.Set;
4950
import java.util.UUID;
50-
import java.util.concurrent.ConcurrentHashMap;
51+
import java.util.WeakHashMap;
52+
import java.util.concurrent.CompletableFuture;
53+
import java.util.concurrent.ExecutionException;
54+
import java.util.concurrent.ExecutorService;
55+
import java.util.concurrent.Executors;
56+
import java.util.concurrent.Future;
57+
import java.util.concurrent.RejectedExecutionException;
58+
import java.util.concurrent.ThreadFactory;
59+
import java.util.concurrent.TimeUnit;
60+
import java.util.concurrent.TimeoutException;
61+
import java.util.concurrent.atomic.AtomicBoolean;
5162
import java.util.function.Predicate;
5263
import org.slf4j.Logger;
5364
import org.slf4j.LoggerFactory;
5465
import reactor.core.publisher.Mono;
55-
import reactor.core.scheduler.Schedulers;
5666

5767
/**
5868
* A built-in, out-of-the-box JSONL trace exporter based on the Hook event system.
@@ -65,13 +75,15 @@
6575
* <ul>
6676
* <li>This exporter is best-effort by default: serialization / IO errors do not break agent
6777
* execution unless {@link Builder#failFast(boolean)} is enabled.</li>
68-
* <li>This exporter performs blocking file IO, but it runs on Reactor boundedElastic to avoid
69-
* blocking agent execution threads.</li>
78+
* <li>This exporter performs blocking file IO on an internal single-threaded queue to keep file
79+
* order, step IDs, and run IDs consistent.</li>
7080
* </ul>
7181
*/
7282
public final class JsonlTraceExporter implements Hook, AutoCloseable {
7383

7484
private static final Logger log = LoggerFactory.getLogger(JsonlTraceExporter.class);
85+
private static final long CLOSE_TIMEOUT_SECONDS = 30L;
86+
private static final OpenTelemetryAccess OPEN_TELEMETRY_ACCESS = OpenTelemetryAccess.create();
7587

7688
private final Path outputFile;
7789
private final boolean flushEveryLine;
@@ -81,8 +93,10 @@ public final class JsonlTraceExporter implements Hook, AutoCloseable {
8193

8294
private final Object lock = new Object();
8395
private final BufferedWriter writer;
96+
private final ExecutorService exportExecutor;
97+
private final AtomicBoolean closed = new AtomicBoolean(false);
8498

85-
private final Map<String, RunState> runStates = new ConcurrentHashMap<>();
99+
private final Map<String, RunState> runStates = new WeakHashMap<>();
86100

87101
private JsonlTraceExporter(
88102
Path outputFile,
@@ -97,6 +111,7 @@ private JsonlTraceExporter(
97111
this.priority = priority;
98112
this.eventFilter = Objects.requireNonNull(eventFilter, "eventFilter cannot be null");
99113
this.writer = openWriter(outputFile, append);
114+
this.exportExecutor = createExportExecutor();
100115
}
101116

102117
public static Builder builder(Path outputFile) {
@@ -110,27 +125,50 @@ public int priority() {
110125

111126
@Override
112127
public <T extends HookEvent> Mono<T> onEvent(T event) {
113-
if (event == null || !eventFilter.test(event)) {
114-
return Mono.just(event);
128+
T nonNullEvent = Objects.requireNonNull(event, "event cannot be null");
129+
if (!eventFilter.test(nonNullEvent)) {
130+
return Mono.just(nonNullEvent);
115131
}
116132

117-
return Mono.fromCallable(
118-
() -> {
119-
writeEvent(event);
120-
return event;
121-
})
122-
.subscribeOn(Schedulers.boundedElastic())
133+
return Mono.defer(() -> enqueueWrite(nonNullEvent, OPEN_TELEMETRY_ACCESS.captureCurrent()))
123134
.onErrorResume(
124-
e -> {
135+
error -> {
125136
if (failFast) {
126-
return Mono.error(e);
137+
return Mono.error(error);
127138
}
128-
log.warn("Failed to export hook event to JSONL: {}", e.getMessage(), e);
129-
return Mono.just(event);
139+
log.warn(
140+
"Failed to export hook event to JSONL: {}",
141+
error.getMessage(),
142+
error);
143+
return Mono.just(nonNullEvent);
130144
});
131145
}
132146

133-
private void writeEvent(HookEvent event) throws IOException {
147+
private <T extends HookEvent> Mono<T> enqueueWrite(T event, OpenTelemetryIds openTelemetryIds) {
148+
if (closed.get()) {
149+
return Mono.error(
150+
new RejectedExecutionException(
151+
"JSONL exporter is closed: " + outputFile.toAbsolutePath()));
152+
}
153+
154+
CompletableFuture<T> future = new CompletableFuture<>();
155+
try {
156+
exportExecutor.execute(
157+
() -> {
158+
try {
159+
writeEvent(event, openTelemetryIds);
160+
future.complete(event);
161+
} catch (Throwable error) {
162+
future.completeExceptionally(error);
163+
}
164+
});
165+
} catch (RejectedExecutionException error) {
166+
future.completeExceptionally(error);
167+
}
168+
return Mono.fromFuture(future);
169+
}
170+
171+
private void writeEvent(HookEvent event, OpenTelemetryIds openTelemetryIds) throws IOException {
134172
RunState runState = getOrUpdateRunState(event);
135173

136174
Map<String, Object> record = new LinkedHashMap<>();
@@ -143,14 +181,15 @@ private void writeEvent(HookEvent event) throws IOException {
143181
record.put("turn_id", runState.turnId);
144182
record.put("step_id", runState.stepId);
145183

146-
putOpenTelemetryIdsIfPresent(record);
184+
if (openTelemetryIds != null) {
185+
openTelemetryIds.putIfPresent(record);
186+
}
147187

148188
if (event instanceof ReasoningEvent reasoningEvent) {
149189
record.put("model_name", reasoningEvent.getModelName());
150190
record.put("generate_options", reasoningEvent.getGenerateOptions());
151191
}
152192

153-
// Payload
154193
if (event instanceof PreCallEvent e) {
155194
record.put("input_messages", e.getInputMessages());
156195
} else if (event instanceof PostCallEvent e) {
@@ -172,6 +211,7 @@ private void writeEvent(HookEvent event) throws IOException {
172211
record.put("tool_use", e.getToolUse());
173212
} else if (event instanceof ActingChunkEvent e) {
174213
record.put("tool_use", e.getToolUse());
214+
record.put("incremental_chunk", e.getChunk());
175215
record.put("chunk", e.getChunk());
176216
} else if (event instanceof PostActingEvent e) {
177217
record.put("tool_use", e.getToolUse());
@@ -251,41 +291,50 @@ private static String stackTraceToString(Throwable error) {
251291
return sw.toString();
252292
}
253293

254-
private static void putOpenTelemetryIdsIfPresent(Map<String, Object> record) {
255-
// Optional integration: if OpenTelemetry is on the classpath, try to attach trace/span id.
256-
// This keeps core module free of hard dependencies on OpenTelemetry.
294+
@Override
295+
public void close() throws IOException {
296+
if (!closed.compareAndSet(false, true)) {
297+
return;
298+
}
299+
300+
boolean drained = false;
257301
try {
258-
Class<?> spanClass = Class.forName("io.opentelemetry.api.trace.Span");
259-
Object span = spanClass.getMethod("current").invoke(null);
260-
if (span == null) {
261-
return;
262-
}
263-
Object spanContext = spanClass.getMethod("getSpanContext").invoke(span);
264-
if (spanContext == null) {
265-
return;
302+
Future<?> barrier = exportExecutor.submit(() -> {});
303+
barrier.get(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
304+
drained = true;
305+
} catch (InterruptedException e) {
306+
Thread.currentThread().interrupt();
307+
throw new IOException(
308+
"Interrupted while waiting for JSONL exporter to finish pending writes", e);
309+
} catch (TimeoutException e) {
310+
throw new IOException(
311+
"Timed out while waiting for JSONL exporter to finish pending writes", e);
312+
} catch (ExecutionException e) {
313+
throw new IOException(
314+
"Failed while waiting for JSONL exporter to finish pending writes",
315+
e.getCause());
316+
} finally {
317+
if (drained) {
318+
exportExecutor.shutdown();
319+
} else {
320+
exportExecutor.shutdownNow();
266321
}
267-
268-
Class<?> spanContextClass = Class.forName("io.opentelemetry.api.trace.SpanContext");
269-
boolean valid = (boolean) spanContextClass.getMethod("isValid").invoke(spanContext);
270-
if (!valid) {
271-
return;
322+
runStates.clear();
323+
synchronized (lock) {
324+
writer.flush();
325+
writer.close();
272326
}
273-
274-
String traceId = (String) spanContextClass.getMethod("getTraceId").invoke(spanContext);
275-
String spanId = (String) spanContextClass.getMethod("getSpanId").invoke(spanContext);
276-
record.put("trace_id", traceId);
277-
record.put("span_id", spanId);
278-
} catch (Throwable ignored) {
279-
// Ignore all reflection failures.
280327
}
281328
}
282329

283-
@Override
284-
public void close() throws IOException {
285-
synchronized (lock) {
286-
writer.flush();
287-
writer.close();
288-
}
330+
private static ExecutorService createExportExecutor() {
331+
ThreadFactory threadFactory =
332+
runnable -> {
333+
Thread thread = new Thread(runnable, "agentscope-jsonl-trace-exporter");
334+
thread.setDaemon(true);
335+
return thread;
336+
};
337+
return Executors.newSingleThreadExecutor(threadFactory);
289338
}
290339

291340
private static final class RunState {
@@ -294,6 +343,93 @@ private static final class RunState {
294343
private long stepId = 0;
295344
}
296345

346+
private static final class OpenTelemetryAccess {
347+
private final Method currentMethod;
348+
private final Method getSpanContextMethod;
349+
private final Method isValidMethod;
350+
private final Method getTraceIdMethod;
351+
private final Method getSpanIdMethod;
352+
353+
private OpenTelemetryAccess(
354+
Method currentMethod,
355+
Method getSpanContextMethod,
356+
Method isValidMethod,
357+
Method getTraceIdMethod,
358+
Method getSpanIdMethod) {
359+
this.currentMethod = currentMethod;
360+
this.getSpanContextMethod = getSpanContextMethod;
361+
this.isValidMethod = isValidMethod;
362+
this.getTraceIdMethod = getTraceIdMethod;
363+
this.getSpanIdMethod = getSpanIdMethod;
364+
}
365+
366+
private static OpenTelemetryAccess create() {
367+
try {
368+
ClassLoader classLoader = JsonlTraceExporter.class.getClassLoader();
369+
Class<?> spanClass =
370+
Class.forName("io.opentelemetry.api.trace.Span", false, classLoader);
371+
Class<?> spanContextClass =
372+
Class.forName("io.opentelemetry.api.trace.SpanContext", false, classLoader);
373+
return new OpenTelemetryAccess(
374+
spanClass.getMethod("current"),
375+
spanClass.getMethod("getSpanContext"),
376+
spanContextClass.getMethod("isValid"),
377+
spanContextClass.getMethod("getTraceId"),
378+
spanContextClass.getMethod("getSpanId"));
379+
} catch (Throwable ignored) {
380+
return new OpenTelemetryAccess(null, null, null, null, null);
381+
}
382+
}
383+
384+
private void putIfPresent(Map<String, Object> record) {
385+
if (currentMethod == null) {
386+
return;
387+
}
388+
OpenTelemetryIds openTelemetryIds = captureCurrent();
389+
if (openTelemetryIds != null) {
390+
openTelemetryIds.putIfPresent(record);
391+
}
392+
}
393+
394+
private OpenTelemetryIds captureCurrent() {
395+
if (currentMethod == null) {
396+
return null;
397+
}
398+
try {
399+
Object span = currentMethod.invoke(null);
400+
if (span == null) {
401+
return null;
402+
}
403+
Object spanContext = getSpanContextMethod.invoke(span);
404+
if (spanContext == null || !(boolean) isValidMethod.invoke(spanContext)) {
405+
return null;
406+
}
407+
return new OpenTelemetryIds(
408+
(String) getTraceIdMethod.invoke(spanContext),
409+
(String) getSpanIdMethod.invoke(spanContext));
410+
} catch (Throwable ignored) {
411+
return null;
412+
}
413+
}
414+
}
415+
416+
private static final class OpenTelemetryIds {
417+
private final String traceId;
418+
private final String spanId;
419+
420+
private OpenTelemetryIds(String traceId, String spanId) {
421+
this.traceId = traceId;
422+
this.spanId = spanId;
423+
}
424+
425+
private void putIfPresent(Map<String, Object> record) {
426+
if (traceId != null && spanId != null) {
427+
record.put("trace_id", traceId);
428+
record.put("span_id", spanId);
429+
}
430+
}
431+
}
432+
297433
public static final class Builder {
298434
private final Path outputFile;
299435

0 commit comments

Comments
 (0)