Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext conte
private final AtomicLong channelSuspendedTime = new AtomicLong(0);
private final BlockingQueue<InputStreamHolder> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicBoolean nodeMetricsEmitted = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();
private final AtomicReference<TrafficCop> trafficCopRef = new AtomicReference<>();

Expand Down Expand Up @@ -359,15 +360,7 @@ public ClientResponse<InputStream> done(ClientResponse<InputStream> clientRespon
// Floating math; division by zero will yield Inf, not exception
totalByteCount.get() / (0.001 * nodeTimeMs)
);
QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
responseMetrics.reportNodeTime(nodeTimeNs);
responseMetrics.reportNodeBytes(totalByteCount.get());

if (usingBackpressure) {
responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
}

responseMetrics.emit(emitter);
emitNodeMetrics(nodeTimeNs);
synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
Expand Down Expand Up @@ -400,6 +393,7 @@ public void exceptionCaught(final ClientResponse<InputStream> clientResponse, fi

private void setupResponseReadFailure(String msg, Throwable th)
{
emitNodeMetrics(System.nanoTime() - requestStartTimeNs);
fail.set(msg);
queue.clear();
queue.offer(
Expand All @@ -422,6 +416,21 @@ public int read() throws IOException
);
}

// Emit exactly once, regardless of whether we reach this via done() or setupResponseReadFailure().
private void emitNodeMetrics(long nodeTimeNs)
{
if (!nodeMetricsEmitted.compareAndSet(false, true)) {
return;
}
QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
responseMetrics.reportNodeTime(nodeTimeNs);
responseMetrics.reportNodeBytes(totalByteCount.get());
if (usingBackpressure) {
responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
}
responseMetrics.emit(emitter);
}

// Returns remaining timeout or throws exception if timeout already elapsed.
private long checkQueryTimeout()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.NestedDataTestUtils;
import org.apache.druid.query.QueryContexts;
Expand Down Expand Up @@ -322,6 +324,60 @@ public void testConnectionCountAfterException()
Assert.assertEquals(0, client.getNumOpenConnections());
}

@Test
public void testNodeMetricsEmittedOnSuccess()
{
StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
DirectDruidClient client = makeDirectDruidClient(initHttpClientWithSuccessfulQuery(), stubEmitter);

client.run(getQueryPlus(), responseContext).toList();

Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/bytes"));
}

@Test
public void testNodeMetricsEmittedOnError()
{
// Only setupResponseReadFailure fires (checkQueryTimeout during handleResponse) — done() is never called.
StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
final TestHttpClient testHttpClient = new TestHttpClient(objectMapper, 110);
DirectDruidClient client = makeDirectDruidClient(initHttpClientFromExistingClient(testHttpClient, false), stubEmitter);

final QueryPlus queryPlus = getQueryPlus(Map.of(
DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 50
));

Assert.assertThrows(QueryTimeoutException.class, () -> client.run(queryPlus, responseContext));

Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/bytes"));
}

@Test
public void testNodeMetricsEmittedExactlyOnceWhenDoneAndTimeoutBothFire() throws InterruptedException
{
// done() fires synchronously during run(), then results.toList() calls checkQueryTimeout() after the
// timeout has already expired, triggering setupResponseReadFailure(). The compareAndSet guard must
// prevent the second emitNodeMetrics() call from emitting.
StubServiceEmitter stubEmitter = StubServiceEmitter.createStarted();
DirectDruidClient client = makeDirectDruidClient(initHttpClientWithSuccessfulQuery(), stubEmitter);

// Timeout far enough in the future that handleResponse + done() complete during run(), but we sleep
// past it before consuming the sequence so that checkQueryTimeout() fires during toList().
final QueryPlus queryPlus = getQueryPlus(Map.of(
DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500
));

Sequence results = client.run(queryPlus, responseContext);
Thread.sleep(600);

Assert.assertThrows(QueryTimeoutException.class, results::toList);

Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/time"));
Assert.assertEquals(1, stubEmitter.getMetricEventCount("query/node/bytes"));
}

@Test
public void testResourceLimitExceededException()
{
Expand All @@ -346,6 +402,11 @@ public void testResourceLimitExceededException()
}

private DirectDruidClient makeDirectDruidClient(HttpClient httpClient)
{
return makeDirectDruidClient(httpClient, new NoopServiceEmitter());
}

private DirectDruidClient makeDirectDruidClient(HttpClient httpClient, ServiceEmitter emitter)
{
return new DirectDruidClient(
conglomerateRule.getConglomerate(),
Expand All @@ -354,7 +415,7 @@ private DirectDruidClient makeDirectDruidClient(HttpClient httpClient)
httpClient,
"http",
hostName,
new NoopServiceEmitter(),
emitter,
queryCancellationExecutor
);
}
Expand Down
Loading