Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,25 @@ public String getEncodingString()
{
return "deflate";
}
},
LZ4 {
@Override
public String getEncodingString()
{
return "x-lz4";
}
},
ZSTD {
@Override
public String getEncodingString()
{
return "zstd";
}
};

/**
* Get the header-ified name of this encoding, which should go in "Accept-Encoding" and
* "Content-Encoding" headers. This is not just the lowercasing of the enum name, since
* we may one day support x- encodings like LZ4, which would likely be an enum named
* "LZ4" that has an encoding string like "x-lz4".
* "Content-Encoding" headers.
*
* @return encoding name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.github.luben.zstd.ZstdInputStream;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import net.jpountz.lz4.LZ4BlockInputStream;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand Down Expand Up @@ -159,6 +161,7 @@
final JavaType queryResultType = isBySegment ? toolChest.getBySegmentResultType() : toolChest.getBaseResultType();

final ListenableFuture<InputStream> future;
final AtomicReference<String> responseContentEncoding = new AtomicReference<>("");
final String url = scheme + "://" + host + "/druid/v2/";
final String cancelUrl = url + query.getId();

Expand Down Expand Up @@ -247,6 +250,8 @@
query.getId(),
query.getSubQueryId()
);
final String encoding = response.headers().get(HttpHeaders.Names.CONTENT_ENCODING);
responseContentEncoding.set(encoding != null ? encoding : "");
final String responseContext = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
context.addRemainingResponse(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES);
// context may be null in case of error or query timeout
Expand Down Expand Up @@ -516,7 +521,7 @@
{
return new JsonParserIterator<>(
queryResultType,
future,
wrapFutureWithDecompressor(future, responseContentEncoding.get()),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Decompressor choice races the response headers

The decompressor is selected from responseContentEncoding.get() when the Sequence iterator is created, which can happen immediately after run() returns and before the async HTTP handler has seen the response headers. In that common path this passes an empty encoding and returns the original future, so a later zstd/x-lz4 response is handed to JsonParserIterator still compressed and parsing fails. Defer the encoding lookup until the future completes, e.g. always transform with a function that reads the AtomicReference after handleResponse has set it.

url,
query,
host,
Expand Down Expand Up @@ -587,6 +592,35 @@
queryCancellationExecutor.submit(cancelRunnable);
}

private static ListenableFuture<InputStream> wrapFutureWithDecompressor(
ListenableFuture<InputStream> future,
String contentEncoding
)
{
if ("x-lz4".equals(contentEncoding)) {
return Futures.transform(future, DirectDruidClient::newLz4InputStream, Execs.directExecutor());
}
if ("zstd".equals(contentEncoding)) {
return Futures.transform(future, DirectDruidClient::newZstdInputStream, Execs.directExecutor());
}
return future;
}

private static InputStream newLz4InputStream(InputStream in)
{
return new LZ4BlockInputStream(in);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
LZ4BlockInputStream.LZ4BlockInputStream
should be avoided because it has been deprecated.
}

private static InputStream newZstdInputStream(InputStream in)
{
try {
return new ZstdInputStream(in);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.luben.zstd.ZstdOutputStream;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingOutputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.ErrorResponse;
Expand Down Expand Up @@ -385,6 +387,36 @@ public interface Writer extends Closeable
void writeResponseEnd() throws IOException;
}

/**
* Picks the best compression encoding the client accepts. Prefers zstd over x-lz4.
* Returns null if no supported encoding is advertised.
*/
@Nullable
private static String negotiateContentEncoding(@Nullable String acceptEncoding)
{
if (acceptEncoding == null) {
return null;
}
if (acceptEncoding.contains("zstd")) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Honor q=0 when negotiating encodings

This substring check treats headers like gzip, zstd;q=0 or x-lz4;q=0 as accepting the encoding, then sends a compressed response that the client explicitly marked unacceptable. QueryResultPusher handles user-facing query responses too, so this can break standards-compliant clients. Parse Accept-Encoding as tokens with q-values and only select zstd/x-lz4 when q is absent or greater than zero.

return "zstd";
}
if (acceptEncoding.contains("x-lz4")) {
return "x-lz4";
}
return null;
}

private static OutputStream wrapWithCompressor(OutputStream out, @Nullable String contentEncoding) throws IOException
{
if ("zstd".equals(contentEncoding)) {
return new ZstdOutputStream(out);
}
if ("x-lz4".equals(contentEncoding)) {
return new LZ4BlockOutputStream(out);
}
return out;
}

public class StreamingHttpResponseAccumulator implements Accumulator<Response, Object>, Closeable
{
private final ResponseContext responseContext;
Expand Down Expand Up @@ -442,8 +474,14 @@ public void initialize()

response.setTrailerFields(() -> trailerFields);

final String contentEncoding = negotiateContentEncoding(request.getHeader("Accept-Encoding"));
if (contentEncoding != null) {
response.setHeader("Content-Encoding", contentEncoding);
}

try {
out = new CountingOutputStream(response.getOutputStream());
final OutputStream responseOs = response.getOutputStream();
out = new CountingOutputStream(wrapWithCompressor(responseOs, contentEncoding));
writer = resultsWriter.makeWriter(out);
}
catch (IOException e) {
Expand Down
Loading