From b17fb8bb0b69b2635831a513fe03658d051cdbd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesse=20Tu=C4=9Flu?= Date: Mon, 11 May 2026 16:53:21 -0700 Subject: [PATCH] fix: emit query/node/{bytes/time} metrics even on query failure to data node --- .../druid/client/DirectDruidClient.java | 27 +++++--- .../druid/client/DirectDruidClientTest.java | 63 ++++++++++++++++++- 2 files changed, 80 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index ec4ab6f289ae..3c746e410701 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -181,6 +181,7 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext conte private final AtomicLong channelSuspendedTime = new AtomicLong(0); private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); + private final AtomicBoolean nodeMetricsEmitted = new AtomicBoolean(false); private final AtomicReference fail = new AtomicReference<>(); private final AtomicReference trafficCopRef = new AtomicReference<>(); @@ -359,15 +360,7 @@ public ClientResponse done(ClientResponse clientRespon // Floating math; division by zero will yield Inf, not exception totalByteCount.get() / (0.001 * nodeTimeMs) ); - QueryMetrics> responseMetrics = acquireResponseMetrics(); - responseMetrics.reportNodeTime(nodeTimeNs); - responseMetrics.reportNodeBytes(totalByteCount.get()); - - if (usingBackpressure) { - responseMetrics.reportBackPressureTime(channelSuspendedTime.get()); - } - - responseMetrics.emit(emitter); + emitNodeMetrics(nodeTimeNs); synchronized (done) { try { // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out @@ -400,6 +393,7 @@ public void exceptionCaught(final ClientResponse clientResponse, fi private void setupResponseReadFailure(String msg, Throwable th) { + emitNodeMetrics(System.nanoTime() - requestStartTimeNs); fail.set(msg); queue.clear(); queue.offer( @@ -422,6 +416,21 @@ public int read() throws IOException ); } + // Emit exactly once, regardless of whether we reach this via done() or setupResponseReadFailure(). + private void emitNodeMetrics(long nodeTimeNs) + { + if (!nodeMetricsEmitted.compareAndSet(false, true)) { + return; + } + QueryMetrics> responseMetrics = acquireResponseMetrics(); + responseMetrics.reportNodeTime(nodeTimeNs); + responseMetrics.reportNodeBytes(totalByteCount.get()); + if (usingBackpressure) { + responseMetrics.reportBackPressureTime(channelSuspendedTime.get()); + } + responseMetrics.emit(emitter); + } + // Returns remaining timeout or throws exception if timeout already elapsed. private long checkQueryTimeout() { diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 5452e7636bdc..fa835b009fdc 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -29,8 +29,10 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.Druids; import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.QueryContexts; @@ -322,6 +324,60 @@ public void testConnectionCountAfterException() Assert.assertEquals(0, client.getNumOpenConnections()); } + @Test + public void testNodeMetricsEmittedOnSuccess() + { + StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted(); + DirectDruidClient client = makeDirectDruidClient(initHttpClientWithSuccessfulQuery(), stubEmitter); + + client.run(getQueryPlus(), responseContext).toList(); + + Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time")); + Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/bytes")); + } + + @Test + public void testNodeMetricsEmittedOnError() + { + // Only setupResponseReadFailure fires (checkQueryTimeout during handleResponse) — done() is never called. + StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted(); + final TestHttpClient testHttpClient = new TestHttpClient(objectMapper, 110); + DirectDruidClient client = makeDirectDruidClient(initHttpClientFromExistingClient(testHttpClient, false), stubEmitter); + + final QueryPlus queryPlus = getQueryPlus(Map.of( + DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 50 + )); + + Assert.assertThrows(QueryTimeoutException.class, () -> client.run(queryPlus, responseContext)); + + Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time")); + Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/bytes")); + } + + @Test + public void testNodeMetricsEmittedExactlyOnceWhenDoneAndTimeoutBothFire() throws InterruptedException + { + // done() fires synchronously during run(), then results.toList() calls checkQueryTimeout() after the + // timeout has already expired, triggering setupResponseReadFailure(). The compareAndSet guard must + // prevent the second emitNodeMetrics() call from emitting. + StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted(); + DirectDruidClient client = makeDirectDruidClient(initHttpClientWithSuccessfulQuery(), stubEmitter); + + // Timeout far enough in the future that handleResponse + done() complete during run(), but we sleep + // past it before consuming the sequence so that checkQueryTimeout() fires during toList(). + final QueryPlus queryPlus = getQueryPlus(Map.of( + DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500 + )); + + Sequence results = client.run(queryPlus, responseContext); + Thread.sleep(600); + + Assert.assertThrows(QueryTimeoutException.class, results::toList); + + Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time")); + Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/bytes")); + } + @Test public void testResourceLimitExceededException() { @@ -346,6 +402,11 @@ public void testResourceLimitExceededException() } private DirectDruidClient makeDirectDruidClient(HttpClient httpClient) + { + return makeDirectDruidClient(httpClient, new NoopServiceEmitter()); + } + + private DirectDruidClient makeDirectDruidClient(HttpClient httpClient, ServiceEmitter emitter) { return new DirectDruidClient( conglomerateRule.getConglomerate(), @@ -354,7 +415,7 @@ private DirectDruidClient makeDirectDruidClient(HttpClient httpClient) httpClient, "http", hostName, - new NoopServiceEmitter(), + emitter, queryCancellationExecutor ); }