diff --git a/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java b/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java index b652c0a27450..3d16c857228b 100644 --- a/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java +++ b/processing/src/main/java/org/apache/druid/java/util/http/client/HttpClientConfig.java @@ -52,13 +52,25 @@ public String getEncodingString() { return "deflate"; } + }, + LZ4 { + @Override + public String getEncodingString() + { + return "x-lz4"; + } + }, + ZSTD { + @Override + public String getEncodingString() + { + return "zstd"; + } }; /** * Get the header-ified name of this encoding, which should go in "Accept-Encoding" and - * "Content-Encoding" headers. This is not just the lowercasing of the enum name, since - * we may one day support x- encodings like LZ4, which would likely be an enum named - * "LZ4" that has an encoding string like "x-lz4". + * "Content-Encoding" headers. * * @return encoding name */ diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index ec4ab6f289ae..3479b3a3f9c4 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -23,10 +23,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.github.luben.zstd.ZstdInputStream; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import net.jpountz.lz4.LZ4BlockInputStream; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -159,6 +161,7 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext conte final JavaType queryResultType = isBySegment ? toolChest.getBySegmentResultType() : toolChest.getBaseResultType(); final ListenableFuture future; + final AtomicReference responseContentEncoding = new AtomicReference<>(""); final String url = scheme + "://" + host + "/druid/v2/"; final String cancelUrl = url + query.getId(); @@ -247,6 +250,8 @@ public ClientResponse handleResponse(HttpResponse response, Traffic query.getId(), query.getSubQueryId() ); + final String encoding = response.headers().get(HttpHeaders.Names.CONTENT_ENCODING); + responseContentEncoding.set(encoding != null ? encoding : ""); final String responseContext = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT); context.addRemainingResponse(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES); // context may be null in case of error or query timeout @@ -516,7 +521,7 @@ public JsonParserIterator make() { return new JsonParserIterator<>( queryResultType, - future, + wrapFutureWithDecompressor(future, responseContentEncoding.get()), url, query, host, @@ -587,6 +592,35 @@ private void cancelQuery(Query query, String cancelUrl) queryCancellationExecutor.submit(cancelRunnable); } + private static ListenableFuture wrapFutureWithDecompressor( + ListenableFuture future, + String contentEncoding + ) + { + if ("x-lz4".equals(contentEncoding)) { + return Futures.transform(future, DirectDruidClient::newLz4InputStream, Execs.directExecutor()); + } + if ("zstd".equals(contentEncoding)) { + return Futures.transform(future, DirectDruidClient::newZstdInputStream, Execs.directExecutor()); + } + return future; + } + + private static InputStream newLz4InputStream(InputStream in) + { + return new LZ4BlockInputStream(in); + } + + private static InputStream newZstdInputStream(InputStream in) + { + try { + return new ZstdInputStream(in); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java index ee9849a7b119..dd8ae6b6dfe1 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java +++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java @@ -21,8 +21,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.luben.zstd.ZstdOutputStream; import com.google.common.collect.ImmutableMap; import com.google.common.io.CountingOutputStream; +import net.jpountz.lz4.LZ4BlockOutputStream; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.error.DruidException; import org.apache.druid.error.ErrorResponse; @@ -385,6 +387,36 @@ public interface Writer extends Closeable void writeResponseEnd() throws IOException; } + /** + * Picks the best compression encoding the client accepts. Prefers zstd over x-lz4. + * Returns null if no supported encoding is advertised. + */ + @Nullable + private static String negotiateContentEncoding(@Nullable String acceptEncoding) + { + if (acceptEncoding == null) { + return null; + } + if (acceptEncoding.contains("zstd")) { + return "zstd"; + } + if (acceptEncoding.contains("x-lz4")) { + return "x-lz4"; + } + return null; + } + + private static OutputStream wrapWithCompressor(OutputStream out, @Nullable String contentEncoding) throws IOException + { + if ("zstd".equals(contentEncoding)) { + return new ZstdOutputStream(out); + } + if ("x-lz4".equals(contentEncoding)) { + return new LZ4BlockOutputStream(out); + } + return out; + } + public class StreamingHttpResponseAccumulator implements Accumulator, Closeable { private final ResponseContext responseContext; @@ -442,8 +474,14 @@ public void initialize() response.setTrailerFields(() -> trailerFields); + final String contentEncoding = negotiateContentEncoding(request.getHeader("Accept-Encoding")); + if (contentEncoding != null) { + response.setHeader("Content-Encoding", contentEncoding); + } + try { - out = new CountingOutputStream(response.getOutputStream()); + final OutputStream responseOs = response.getOutputStream(); + out = new CountingOutputStream(wrapWithCompressor(responseOs, contentEncoding)); writer = resultsWriter.makeWriter(out); } catch (IOException e) {