Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.apache.druid.msq.exec.FrameContext;
import org.apache.druid.msq.exec.FrameWriterSpec;
import org.apache.druid.msq.exec.ProcessingBuffers;
import org.apache.druid.msq.exec.ProcessingBuffersSet;
import org.apache.druid.msq.exec.WorkerContext;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
import org.apache.druid.segment.IndexIO;
Expand All @@ -42,6 +42,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.server.SegmentManager;

import javax.annotation.Nullable;
import java.io.File;

/**
Expand All @@ -52,24 +53,33 @@ public class DartFrameContext implements FrameContext
private final StageId stageId;
private final FrameWriterSpec frameWriterSpec;
private final SegmentWrangler segmentWrangler;
private final GroupingEngine groupingEngine;
private final SegmentManager segmentManager;
private final CoordinatorClient coordinatorClient;
private final WorkerContext workerContext;
private final ResourceHolder<ProcessingBuffers> processingBuffers;

/**
* Null if the stage does not use processing buffers.
*/
@Nullable
private final ProcessingBuffersSet processingBuffersSet;
private final WorkerMemoryParameters memoryParameters;
private final WorkerStorageParameters storageParameters;
private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;

/**
* Acquired by {@link #acquireProcessingBuffers}.
*/
@Nullable
private ResourceHolder<ProcessingBuffers> processingBuffers;

public DartFrameContext(
final StageId stageId,
final WorkerContext workerContext,
final FrameWriterSpec frameWriterSpec,
final SegmentWrangler segmentWrangler,
final GroupingEngine groupingEngine,
final SegmentManager segmentManager,
final CoordinatorClient coordinatorClient,
final ResourceHolder<ProcessingBuffers> processingBuffers,
@Nullable final ProcessingBuffersSet processingBuffersSet,
final WorkerMemoryParameters memoryParameters,
final WorkerStorageParameters storageParameters,
final DataServerQueryHandlerFactory dataServerQueryHandlerFactory
Expand All @@ -78,11 +88,10 @@ public DartFrameContext(
this.stageId = stageId;
this.segmentWrangler = segmentWrangler;
this.frameWriterSpec = frameWriterSpec;
this.groupingEngine = groupingEngine;
this.segmentManager = segmentManager;
this.coordinatorClient = coordinatorClient;
this.workerContext = workerContext;
this.processingBuffers = processingBuffers;
this.processingBuffersSet = processingBuffersSet;
this.memoryParameters = memoryParameters;
this.storageParameters = storageParameters;
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
Expand Down Expand Up @@ -160,9 +169,24 @@ public IndexMerger indexMerger()
throw DruidException.defensive("Ingestion not implemented");
}

@Override
public void acquireProcessingBuffers(final int requestedSlices)
{
if (processingBuffersSet == null) {
throw DruidException.defensive("Stage[%s] does not use processing buffers", stageId);
}
if (processingBuffers != null) {
throw DruidException.defensive("Processing buffers already acquired");
}
processingBuffers = processingBuffersSet.acquire(requestedSlices);
}

@Override
public ProcessingBuffers processingBuffers()
{
if (processingBuffers == null) {
throw DruidException.defensive("Processing buffers not yet acquired");
}
return processingBuffers.get();
}

Expand Down Expand Up @@ -193,6 +217,8 @@ public FrameWriterSpec frameWriterSpec()
@Override
public void close()
{
processingBuffers.close();
if (processingBuffers != null) {
processingBuffers.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.collections.QueueNonBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.msq.exec.ProcessingBuffers;
import org.apache.druid.msq.exec.ProcessingBuffersProvider;
Expand All @@ -39,7 +40,7 @@

/**
* Production implementation of {@link ProcessingBuffersProvider} that uses the merge buffer pool. Each call
* to {@link #acquire(int)} acquires one merge buffer and slices it up.
* to {@link #acquire(int, long)} acquires one merge buffer and slices it up.
*/
public class DartProcessingBuffersProvider implements ProcessingBuffersProvider
{
Expand Down Expand Up @@ -67,27 +68,59 @@ public ResourceHolder<ProcessingBuffersSet> acquire(final int poolSize, final lo
final ReferenceCountingResourceHolder<ByteBuffer> bufferHolder = batch.get(0);
try {
final ByteBuffer buffer = bufferHolder.get().duplicate();
final int sliceSize = buffer.capacity() / poolSize / processingThreads;
final List<ProcessingBuffers> pool = new ArrayList<>(poolSize);
final int chunkSize = buffer.capacity() / poolSize;
final List<ProcessingBuffersSet.Slot> slots = new ArrayList<>(poolSize);

for (int i = 0; i < poolSize; i++) {
final BlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<>(processingThreads);
for (int j = 0; j < processingThreads; j++) {
final int sliceNum = i * processingThreads + j;
buffer.position(sliceSize * sliceNum).limit(sliceSize * (sliceNum + 1));
queue.add(buffer.slice());
}
final ProcessingBuffers buffers = new ProcessingBuffers(
new QueueNonBlockingPool<>(queue),
new Bouncer(processingThreads)
);
pool.add(buffers);
buffer.position(chunkSize * i).limit(chunkSize * (i + 1));
slots.add(new LazySlot(buffer.slice(), processingThreads));
}

return new ReferenceCountingResourceHolder<>(new ProcessingBuffersSet(pool), bufferHolder);
return new ReferenceCountingResourceHolder<>(new ProcessingBuffersSet(slots), bufferHolder);
}
catch (Throwable e) {
throw CloseableUtils.closeAndWrapInCatch(e, bufferHolder);
}
}

/**
* Lazy slot that holds one chunk of the shared merge buffer and slices it on demand to match the stage's
* actual concurrent-processor count.
*/
static final class LazySlot implements ProcessingBuffersSet.Slot
{
private final ByteBuffer chunk;
private final int maxSlices;

LazySlot(final ByteBuffer chunk, final int maxSlices)
{
this.chunk = chunk;
this.maxSlices = maxSlices;
}

@Override
public ProcessingBuffers acquire(final int requestedSlices)
{
if (requestedSlices > maxSlices) {
throw DruidException.defensive(
"requestedSlices[%d] too large for maxSlices[%d]",
requestedSlices,
maxSlices
);
}

if (requestedSlices < 1) {
throw DruidException.defensive("requestedSlices[%d] must be positive", requestedSlices);
}

final int sliceSize = chunk.capacity() / requestedSlices;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defensive check that requested slices > 0 or do we trust the upstream callers always request at least one slice?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a defensive check.

final BlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<>(requestedSlices);
final ByteBuffer working = chunk.duplicate();
for (int j = 0; j < requestedSlices; j++) {
working.position(sliceSize * j).limit(sliceSize * (j + 1));
queue.add(working.slice());
}
return new ProcessingBuffers(new QueueNonBlockingPool<>(queue), new Bouncer(requestedSlices));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,9 @@ public FrameContext frameContext(WorkOrder workOrder)
this,
FrameWriterSpec.fromContext(workOrder.getWorkerContext()),
segmentWrangler,
groupingEngine,
segmentManager,
coordinatorClient,
processingBuffersSet.get().acquireForStage(workOrder.getStageDefinition()),
workOrder.getStageDefinition().getProcessor().usesProcessingBuffers() ? processingBuffersSet.get() : null,
memoryParameters,
storageParameters,
dataServerQueryHandlerFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ public interface FrameContext extends Closeable

IndexMerger indexMerger();

/**
* Acquire processing buffers sized for {@code requestedSlices} concurrent processors. Must be called exactly
* once for stages that use processing buffers, before any call to {@link #processingBuffers()}. Stages that
* don't use processing buffers must not call this method.
*/
void acquireProcessingBuffers(int requestedSlices);

/**
* Returns the {@link ProcessingBuffers} previously acquired via {@link #acquireProcessingBuffers}. Throws if
* not yet acquired.
*/
ProcessingBuffers processingBuffers();

WorkerMemoryParameters memoryParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
import org.apache.druid.msq.kernel.StageDefinition;

import java.nio.ByteBuffer;
import java.util.Collection;
Expand All @@ -31,60 +30,73 @@
import java.util.stream.Collectors;

/**
* Holds a set of {@link ProcessingBuffers} for a {@link Worker}. Acquired from {@link ProcessingBuffersProvider}.
* Holds a set of {@link Slot}, each of which can produce {@link ProcessingBuffers} for one concurrent stage.
* Acquired from {@link ProcessingBuffersProvider}.
*
* Slots come in two flavors:
* <ul>
* <li>{@link EagerSlot}: holds an already-built {@link ProcessingBuffers}; ignores the requested slice count.
* Used by buffer providers that pre-allocate (Peon, Indexer).</li>
* <li>Lazy slots (provider-defined): hold a buffer chunk and slice it per stage based on the actual concurrent
* processor count, so a stage that runs fewer processors gets larger slices. Used by Dart.</li>
* </ul>
*/
public class ProcessingBuffersSet
{
public static final ProcessingBuffersSet EMPTY = new ProcessingBuffersSet(Collections.emptyList());

private final BlockingQueue<ProcessingBuffers> pool;
private final BlockingQueue<Slot> pool;

public ProcessingBuffersSet(final Collection<? extends Slot> slots)
{
this.pool = new ArrayBlockingQueue<>(slots.isEmpty() ? 1 : slots.size());
this.pool.addAll(slots);
}

public ProcessingBuffersSet(Collection<ProcessingBuffers> buffers)
/**
* Wrap a collection of pre-built {@link ProcessingBuffers}.
*/
public static ProcessingBuffersSet wrap(final Collection<ProcessingBuffers> buffers)
{
this.pool = new ArrayBlockingQueue<>(buffers.isEmpty() ? 1 : buffers.size());
this.pool.addAll(buffers);
return new ProcessingBuffersSet(buffers.stream().map(EagerSlot::new).collect(Collectors.toList()));
}

/**
* Equivalent to calling {@link ProcessingBuffers#fromCollection} on each collection in the overall collection,
* then creating an instance.
* then wrapping in eager slots.
*/
public static <T extends Collection<ByteBuffer>> ProcessingBuffersSet fromCollection(final Collection<T> processingBuffers)
{
return new ProcessingBuffersSet(
return wrap(
processingBuffers.stream()
.map(ProcessingBuffers::fromCollection)
.collect(Collectors.toList())
);
}

/**
* Acquire buffers if a particular stages needs them; otherwise, returns a holder that throws an exception on
* {@link ResourceHolder#get()}.
*/
public ResourceHolder<ProcessingBuffers> acquireForStage(final StageDefinition stageDef)
{
if (!stageDef.getProcessor().usesProcessingBuffers()) {
return new NilResourceHolder<>();
} else {
return acquire();
}
}

/**
* Acquire buffers unconditionally. In production, it is expected that callers will use
* {@link #acquireForStage(StageDefinition)}.
* Acquire buffers with a specific requested slice count. The actual number of slices may be higher but will
* not be lower.
*/
public ResourceHolder<ProcessingBuffers> acquire()
public ResourceHolder<ProcessingBuffers> acquire(final int requestedSlices)
{
final ProcessingBuffers buffers = pool.poll();
final Slot slot = pool.poll();

if (buffers == null) {
if (slot == null) {
// Never happens, because the pool acquired from ProcessingBuffersProvider must be big enough for all
// concurrent processing buffer needs. (In other words: if this does happen, it's a bug.)
throw DruidException.defensive("Processing buffers not available");
}

final ProcessingBuffers buffers;
try {
buffers = slot.acquire(requestedSlices);
}
catch (Throwable t) {
pool.add(slot);
throw t;
}

return new ResourceHolder<>()
{
@Override
Expand All @@ -96,26 +108,49 @@ public ProcessingBuffers get()
@Override
public void close()
{
pool.add(buffers);
pool.add(slot);
}
};
}

/**
* Resource holder that throws an exception on {@link #get()}.
* A producer of {@link ProcessingBuffers} from a single concurrent-stage slot in the pool. Implementations
* decide whether the slice count argument to {@link #acquire} is honored (lazy slots) or ignored (eager slots).
*/
static class NilResourceHolder<T> implements ResourceHolder<T>
public interface Slot
{
@Override
public T get()
/**
* Produce a {@link ProcessingBuffers} suitable for a stage that will run up to {@code requestedSlices}
* concurrent processors. Implementations may choose to ignore the argument when the slot's buffers are
* already laid out (e.g., {@link EagerSlot}).
*/
ProcessingBuffers acquire(int requestedSlices);
}

/**
* Slot that wraps an already-built {@link ProcessingBuffers}.
*/
public static final class EagerSlot implements Slot
{
private final ProcessingBuffers buffers;

public EagerSlot(final ProcessingBuffers buffers)
{
throw DruidException.defensive("Unexpected call to get()");
this.buffers = buffers;
}

@Override
public void close()
public ProcessingBuffers acquire(final int requestedSlices)
{
// Do nothing.
if (requestedSlices > buffers.getBouncer().getMaxCount()) {
throw DruidException.defensive(
"requestedSlices[%d] too large, only have[%d] buffers",
requestedSlices,
buffers.getBouncer().getMaxCount()
);
}

return buffers;
}
}
}
Loading
Loading