Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/data-management/automatic-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ The MSQ task engine is available as a compaction engine if you configure auto-co

You can use [MSQ task engine context parameters](../multi-stage-query/reference.md#context-parameters) in `spec.taskContext` when configuring your datasource for automatic compaction, such as setting the maximum number of tasks using the `spec.taskContext.maxNumTasks` parameter. Some of the MSQ task engine context parameters overlap with automatic compaction parameters. When these settings overlap, set one or the other.

#### Scaling task count for minor compactions

[Minor compactions](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst) typically rewrite a small subset of segments and do less work than full compactions. To avoid spawning a full-sized MSQ task topology for that lighter workload, opt in by setting `spec.taskContext.minorCompactionTaskPercent` to a value between 1 and 100. The percent is applied to `maxNumTasks` to derive the task count for minor compaction tasks; the result is floored at 2 (the MSQ minimum of one controller and one worker).

| Parameter | Description | Default value |
|---|---|---|
| `minorCompactionTaskPercent` | Percent (1-100) used to scale `maxNumTasks` for MSQ minor compactions. Has no effect on full compactions or on the native compaction engine. By default, minor compactions use the same task count as full compactions; set this parameter to opt into scaling. A starting value in the range of 40 to 50 is reasonable for most workloads. | 100 (no scaling) |

For example, with `maxNumTasks` set to 5 and `minorCompactionTaskPercent` set to 40, a minor compaction task launches with `maxNumTasks` of 2 (40% of 5, rounded, then floored at 2), while a full compaction task on the same datasource still launches with `maxNumTasks` of 5.

#### MSQ task engine limitations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

package org.apache.druid.testing.embedded.compact;

import com.google.common.collect.ImmutableList;
import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionsSpec;
Expand All @@ -29,6 +33,7 @@
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
Expand All @@ -50,6 +55,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
Expand Down Expand Up @@ -114,8 +120,10 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -297,6 +305,86 @@ public void test_minorCompactionWithMSQ(MostFragmentedIntervalFirstPolicy policy
Assertions.assertEquals(4000, getTotalRowCount());
}

@Test
public void test_minorCompactionWithMSQ_minorCompactionTaskPercentScalesWorkers()
{
final MostFragmentedIntervalFirstPolicy policy =
new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null, null);
configureCompaction(CompactionEngine.MSQ, policy);

ingest1kRecords();
ingest1kRecords();

overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));

// maxNumTasks=3 (controller + 2 workers) for full; minorCompactionTaskPercent=50
// scales minor compactions to maxNumTasks=2 (controller + 1 worker).
final InlineSchemaDataSourceCompactionConfig dayConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.DAY, null, false))
.withDimensionsSpec(new UserCompactionTaskDimensionsConfig(
WikipediaStreamEventStreamGenerator.dimensions()
.stream()
.map(StringDimensionSchema::new)
.collect(Collectors.toUnmodifiableList())))
.withTaskContext(buildScalingTaskContext(3, 50))
.withIoConfig(new UserCompactionTaskIOConfig(true))
.withTuningConfig(
UserCompactionTaskQueryTuningConfig
.builder()
.partitionsSpec(new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false))
.build()
)
.build();

// First run: full compaction (no compacted segments yet).
runCompactionWithSpec(dayConfig);
waitForAllCompactionTasksToFinish();
pauseCompaction(dayConfig);

overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));

// Capture the full task before the minor task arrives (its ID does not contain "-minor").
final TaskStatusPlus fullTask = findMostRecentCompactionTask(taskId -> !taskId.contains("-minor"));
Assertions.assertNotNull(fullTask, "Expected a full compaction task for datasource[" + dataSource + "]");
assertCompactionTaskMaxNumTasks(fullTask, 3);

// Ingest more so the next run sees a mix of compacted + uncompacted segments.
ingest1kRecords();
ingest1kRecords();

overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time"));
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));

final long totalUsed = overlord.latchableEmitter().getMetricValues(
"segment/metadataCache/used/count",
Map.of(DruidMetrics.DATASOURCE, dataSource)
).stream().reduce((first, second) -> second).orElse(0).longValue();

// Second run: minor compaction (uncompacted bytes ratio is below the 80% threshold).
runCompactionWithSpec(dayConfig);
waitForAllCompactionTasksToFinish();

overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("segment/metadataCache/used/count")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasValueMatching(Matchers.greaterThan(totalUsed)));

Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));

final TaskStatusPlus minorTask = findMostRecentCompactionTask(taskId -> taskId.contains("-minor"));
Assertions.assertNotNull(minorTask, "Expected a minor compaction task for datasource[" + dataSource + "]");
assertCompactionTaskMaxNumTasks(minorTask, 2);
}

@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine compactionEngine)
Expand Down Expand Up @@ -1216,6 +1304,50 @@ private int getNumSegmentsWith(Granularity granularity)
.count();
}

private static Map<String, Object> buildScalingTaskContext(int maxNumTasks, int minorCompactionTaskPercent)
{
final Map<String, Object> context = new HashMap<>();
context.put("useConcurrentLocks", true);
context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks);
context.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, minorCompactionTaskPercent);
return context;
}

/**
* Returns the most recent completed compaction task whose id matches the given filter,
* or null if none is found.
*/
@Nullable
private TaskStatusPlus findMostRecentCompactionTask(Predicate<String> taskIdFilter)
{
final List<TaskStatusPlus> tasks = ImmutableList.copyOf(
(CloseableIterator<TaskStatusPlus>) cluster.callApi().onLeaderOverlord(
o -> o.taskStatuses("complete", dataSource, 100)
)
);
TaskStatusPlus match = null;
for (TaskStatusPlus task : tasks) {
if ("compact".equals(task.getType()) && taskIdFilter.test(task.getId())) {
match = task;
}
}
return match;
}

/**
* Asserts that the submitted compaction task carries the expected {@code maxNumTasks} in its context.
*/
private void assertCompactionTaskMaxNumTasks(TaskStatusPlus task, int expectedMaxNumTasks)
{
final TaskPayloadResponse payload = cluster.callApi().onLeaderOverlord(o -> o.taskPayload(task.getId()));
final ClientCompactionTaskQuery query = (ClientCompactionTaskQuery) payload.getPayload();
Assertions.assertEquals(
expectedMaxNumTasks,
((Number) query.getContext().get(ClientMSQContext.CTX_MAX_NUM_TASKS)).intValue(),
"maxNumTasks in compaction task[" + task.getId() + "] context"
);
}

private void runIngestionAtGranularity(
String granularity,
String inlineDataCsv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public UserCompactionTaskQueryTuningConfig getTuningConfig()
* Range partition dimension type checking passes {@code null} for dimensionSchemas
* since those are not known at template level.</li>
* <li>maxNumTasks >= 2 in taskContext.</li>
* <li>minorCompactionTaskPercent (if set) is an integer between 1 and 100 in taskContext.</li>
* </ul>
*
* <p>Standard MSQ checks skipped (not applicable at template level):
Expand Down Expand Up @@ -293,6 +294,8 @@ public CompactionConfigValidationResult validate(ClusterCompactionConfig cluster

results.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(this.getTaskContext()));

results.add(ClientCompactionRunnerInfo.validateMinorCompactionTaskPercentForMSQ(this.getTaskContext()));

return results.stream()
.filter(result -> !result.isValid())
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn
));
}
validationResults.add(validateMaxNumTasksForMSQ(newConfig.getTaskContext()));
validationResults.add(validateMinorCompactionTaskPercentForMSQ(newConfig.getTaskContext()));
validationResults.add(validateMetricsSpecForMSQ(newConfig.getMetricsSpec()));
return validationResults.stream()
.filter(result -> !result.isValid())
Expand Down Expand Up @@ -241,6 +242,37 @@ public static CompactionConfigValidationResult validateMaxNumTasksForMSQ(Map<Str
return CompactionConfigValidationResult.success();
}

/**
* Validate that {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT}, if present in context,
* is an integer between 1 and 100. Rejects malformed and out-of-range values upfront so a bad
* config cannot be persisted and then fail every subsequent minor compaction job.
*/
public static CompactionConfigValidationResult validateMinorCompactionTaskPercentForMSQ(Map<String, Object> context)
{
if (context == null || !context.containsKey(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT)) {
return CompactionConfigValidationResult.success();
}
final int percent;
try {
percent = QueryContext.of(context).getInt(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Reject fractional minorCompactionTaskPercent values

This validation uses QueryContext.getInt, which accepts any Number by calling intValue(). JSON values like 50.9 deserialize as Double and are silently truncated to 50, so malformed configs can be persisted despite the new contract requiring an integer between 1 and 100. Please reject non-integral Number values, or parse with exact integer conversion, before accepting the config.

}
catch (Exception e) {
return CompactionConfigValidationResult.failure(
"MSQ: Context '%s'[%s] must be an integer between 1 and 100",
ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT,
context.get(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT)
);
}
if (percent < 1 || percent > 100) {
return CompactionConfigValidationResult.failure(
"MSQ: Context '%s'[%d] must be between 1 and 100",
ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT,
percent
);
}
return CompactionConfigValidationResult.success();
}

/**
* Validate each metric defines some aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ public class ClientMSQContext
* Limit to ensure that an MSQ compaction task doesn't take up all task slots in a cluster.
*/
public static final int MAX_TASK_SLOTS_FOR_MSQ_COMPACTION_TASK = 5;

/**
* Compaction-only context key (1-100) used to scale {@link #CTX_MAX_NUM_TASKS} for MSQ minor compactions.
* The scaled value is floored at {@link #DEFAULT_MAX_NUM_TASKS}. Has no effect on full compactions or on the native
* compaction engine.
*/
public static final String CTX_MINOR_COMPACTION_TASK_PERCENT = "minorCompactionTaskPercent";
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.client.indexing.ClientMinorCompactionInputSpec;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
Expand All @@ -41,6 +42,7 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
Expand Down Expand Up @@ -294,6 +296,17 @@ private int submitCompactionTasks(
return numSubmittedTasks;
}

/**
* Default percent applied to {@code maxNumTasks} for MSQ minor compactions
* when the supervisor's {@code taskContext} does not specify
* {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT}. The default of
* 100 disables scaling, matching the opt-in shape of the minor compaction
* feature itself (which is gated by non-zero
* {@code minUncompactedBytesPercentForFullCompaction} or
* {@code minUncompactedRowsPercentForFullCompaction} on the policy).
*/
public static final int DEFAULT_MINOR_COMPACTION_TASK_PERCENT = 100;

/**
* Creates a {@link ClientCompactionTaskQuery} which can be submitted to an
* {@link OverlordClient} to start a compaction task.
Expand Down Expand Up @@ -448,6 +461,56 @@ public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
return autoCompactionSnapshotPerDataSource.get();
}

/**
* Reduces {@code maxNumTasks} on the task context for MSQ minor compactions
* by the percent specified under
* {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT} in the same
* context, defaulting to {@link #DEFAULT_MINOR_COMPACTION_TASK_PERCENT} when
* absent. The scaled value is floored at {@link ClientMSQContext#DEFAULT_MAX_NUM_TASKS}
* (the MSQ minimum of 1 controller + 1 worker). No-op when the engine is
* native, the mode is full, or the percent is 100.
*
* <p>The percent is validated at config-acceptance time by
* {@link ClientCompactionRunnerInfo#validateMinorCompactionTaskPercentForMSQ},
* so an out-of-range value here indicates a bug.
*/
private static void maybeScaleMaxNumTasksForMinorCompaction(
Map<String, Object> context,
CompactionMode compactionMode,
ClientCompactionRunnerInfo compactionRunner
)
{
if (compactionMode != CompactionMode.UNCOMPACTED_SEGMENTS_ONLY
|| !CompactionEngine.MSQ.equals(compactionRunner.getType())) {
return;
}

final int percent = QueryContext.of(context).getInt(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Validate minorCompactionTaskPercent when accepting the compaction config

The new context key is only parsed and range-checked while creating a minor MSQ compaction task. Supervisor config validation still accepts values like 0, 200, or a non-numeric string, so the API can persist an invalid supervisor and it will later fail job creation repeatedly once a minor compaction candidate appears. Please add validation alongside the existing MSQ maxNumTasks validation paths, including CascadingReindexingTemplate if applicable, so bad configs are rejected before scheduling.

ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT,
DEFAULT_MINOR_COMPACTION_TASK_PERCENT
);
if (percent < 1 || percent > 100) {
throw DruidException.defensive(
"'%s'[%d] must be between 1 and 100; should have been rejected at config-acceptance time",
ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT,
percent
);
}
if (percent == 100) {
return;
}

final int originalMaxNumTasks = QueryContext.of(context).getInt(
ClientMSQContext.CTX_MAX_NUM_TASKS,
ClientMSQContext.DEFAULT_MAX_NUM_TASKS
);
final int scaledMaxNumTasks = Math.max(
ClientMSQContext.DEFAULT_MAX_NUM_TASKS,
(int) Math.round(originalMaxNumTasks * percent / 100.0)
);
context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, scaledMaxNumTasks);
}

private static ClientCompactionTaskQuery compactSegments(
CompactionCandidate entry,
Eligibility eligibility,
Expand Down Expand Up @@ -480,6 +543,8 @@ private static ClientCompactionTaskQuery compactSegments(
context.put(COMPACTION_POLICY_RESULT, eligibility.getReason());
}

maybeScaleMaxNumTasksForMinorCompaction(context, compactionMode, compactionRunner);

String taskIdPrefix = compactionMode == CompactionMode.UNCOMPACTED_SEGMENTS_ONLY
? TASK_ID_PREFIX + "-minor"
: TASK_ID_PREFIX;
Expand Down
Loading
Loading