From 2332d3f9c8ce16a0836bd2f4f3a10c95ff94de95 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Tue, 5 May 2026 15:27:20 -0500 Subject: [PATCH 1/5] minor compaction task parallelism scaling for msq compaction supervisors --- docs/data-management/automatic-compaction.md | 9 + .../client/indexing/ClientMSQContext.java | 7 + .../coordinator/duty/CompactSegments.java | 58 +++++ .../CompactSegmentsMinorTaskScalingTest.java | 219 ++++++++++++++++++ website/.spelling | 1 + 5 files changed, 294 insertions(+) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java diff --git a/docs/data-management/automatic-compaction.md b/docs/data-management/automatic-compaction.md index e455319d924a..b41109b12e83 100644 --- a/docs/data-management/automatic-compaction.md +++ b/docs/data-management/automatic-compaction.md @@ -163,6 +163,15 @@ The MSQ task engine is available as a compaction engine if you configure auto-co You can use [MSQ task engine context parameters](../multi-stage-query/reference.md#context-parameters) in `spec.taskContext` when configuring your datasource for automatic compaction, such as setting the maximum number of tasks using the `spec.taskContext.maxNumTasks` parameter. Some of the MSQ task engine context parameters overlap with automatic compaction parameters. When these settings overlap, set one or the other. +#### Scaling task count for minor compactions + +[Minor compactions](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst) typically rewrite a small subset of segments and do less work than full compactions. To avoid spawning a full-sized MSQ task topology for that lighter workload, set `spec.taskContext.minorCompactionTaskPercent` to a value between 1 and 100. The percent is applied to `maxNumTasks` to derive the task count for minor compaction tasks; the result is floored at 2 (the MSQ minimum of one controller and one worker). + +| Parameter | Description | Default value | +|---|---|---| +| `minorCompactionTaskPercent` | Percent (1-100) used to scale `maxNumTasks` for MSQ minor compactions. Has no effect on full compactions or on the native compaction engine. Set to `100` to disable scaling. | 40 | + +For example, with `maxNumTasks` set to 5 and `minorCompactionTaskPercent` set to 40, a minor compaction task launches with `maxNumTasks` of 2 (40% of 5, rounded, then floored at 2), while a full compaction task on the same datasource still launches with `maxNumTasks` of 5. #### MSQ task engine limitations diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientMSQContext.java b/server/src/main/java/org/apache/druid/client/indexing/ClientMSQContext.java index 45279bda3ed3..818e7b12ae8b 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientMSQContext.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientMSQContext.java @@ -32,4 +32,11 @@ public class ClientMSQContext * Limit to ensure that an MSQ compaction task doesn't take up all task slots in a cluster. */ public static final int MAX_TASK_SLOTS_FOR_MSQ_COMPACTION_TASK = 5; + + /** + * Compaction-only context key (1-100) used to scale {@link #CTX_MAX_NUM_TASKS} for MSQ minor compactions. + * The scaled value is floored at {@link #DEFAULT_MAX_NUM_TASKS}. Has no effect on full compactions or on the native + * compaction engine. + */ + public static final String CTX_MINOR_COMPACTION_TASK_PERCENT = "minorCompactionTaskPercent"; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index cbb72762651c..a913223eaf68 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -32,15 +32,18 @@ import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; +import org.apache.druid.client.indexing.ClientMSQContext; import org.apache.druid.client.indexing.ClientMinorCompactionInputSpec; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.AggregateProjectionSpec; import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.QueryContext; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.rpc.indexing.OverlordClient; @@ -294,6 +297,13 @@ private int submitCompactionTasks( return numSubmittedTasks; } + /** + * Default percent applied to {@code maxNumTasks} for MSQ minor compactions + * when the supervisor's {@code taskContext} does not specify + * {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT}. + */ + public static final int DEFAULT_MINOR_COMPACTION_TASK_PERCENT = 40; + /** * Creates a {@link ClientCompactionTaskQuery} which can be submitted to an * {@link OverlordClient} to start a compaction task. @@ -448,6 +458,52 @@ public Map getAutoCompactionSnapshot() return autoCompactionSnapshotPerDataSource.get(); } + /** + * Reduces {@code maxNumTasks} on the task context for MSQ minor compactions + * by the percent specified under + * {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT} in the same + * context, defaulting to {@link #DEFAULT_MINOR_COMPACTION_TASK_PERCENT} when + * absent. The scaled value is floored at {@link ClientMSQContext#DEFAULT_MAX_NUM_TASKS} + * (the MSQ minimum of 1 controller + 1 worker). No-op when the engine is + * native, the mode is full, or the percent is 100. + */ + private static void maybeScaleMaxNumTasksForMinorCompaction( + Map context, + CompactionMode compactionMode, + ClientCompactionRunnerInfo compactionRunner + ) + { + if (compactionMode != CompactionMode.UNCOMPACTED_SEGMENTS_ONLY + || !CompactionEngine.MSQ.equals(compactionRunner.getType())) { + return; + } + + final int percent = QueryContext.of(context).getInt( + ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, + DEFAULT_MINOR_COMPACTION_TASK_PERCENT + ); + if (percent < 1 || percent > 100) { + throw InvalidInput.exception( + "'%s'[%d] must be between 1 and 100", + ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, + percent + ); + } + if (percent == 100) { + return; + } + + final int originalMaxNumTasks = QueryContext.of(context).getInt( + ClientMSQContext.CTX_MAX_NUM_TASKS, + ClientMSQContext.DEFAULT_MAX_NUM_TASKS + ); + final int scaledMaxNumTasks = Math.max( + ClientMSQContext.DEFAULT_MAX_NUM_TASKS, + (int) Math.round(originalMaxNumTasks * percent / 100.0) + ); + context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, scaledMaxNumTasks); + } + private static ClientCompactionTaskQuery compactSegments( CompactionCandidate entry, Eligibility eligibility, @@ -480,6 +536,8 @@ private static ClientCompactionTaskQuery compactSegments( context.put(COMPACTION_POLICY_RESULT, eligibility.getReason()); } + maybeScaleMaxNumTasksForMinorCompaction(context, compactionMode, compactionRunner); + String taskIdPrefix = compactionMode == CompactionMode.UNCOMPACTED_SEGMENTS_ONLY ? TASK_ID_PREFIX + "-minor" : TASK_ID_PREFIX; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java new file mode 100644 index 000000000000..5d1eaad4f238 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.client.indexing.ClientMSQContext; +import org.apache.druid.error.DruidException; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.CompactionStatistics; +import org.apache.druid.server.compaction.CompactionStatus; +import org.apache.druid.server.compaction.Eligibility; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the {@code minorCompactionTaskPercent} task-context key applied by + * {@link CompactSegments#createCompactionTask}. + */ +public class CompactSegmentsMinorTaskScalingTest +{ + private static final String DATA_SOURCE = "wiki"; + + @Test + public void testMinor_msq_appliesDefaultPercentWhenAbsent() + { + final ClientCompactionTaskQuery task = buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.MSQ, + contextWithMaxNumTasks(10) + ); + Assert.assertEquals(4, getMaxNumTasks(task)); + } + + @Test + public void testMinor_msq_explicitPercentInTaskContext() + { + final Map ctx = contextWithMaxNumTasks(10); + ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 25); + final ClientCompactionTaskQuery task = buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.MSQ, + ctx + ); + Assert.assertEquals(3, getMaxNumTasks(task)); + } + + @Test + public void testMinor_msq_floorsAtTwo() + { + final Map ctx = contextWithMaxNumTasks(3); + ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 40); + final ClientCompactionTaskQuery task = buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.MSQ, + ctx + ); + Assert.assertEquals(2, getMaxNumTasks(task)); + } + + @Test + public void testMinor_msq_percentOneHundredIsNoOp() + { + final Map ctx = contextWithMaxNumTasks(5); + ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 100); + final ClientCompactionTaskQuery task = buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.MSQ, + ctx + ); + Assert.assertEquals(5, getMaxNumTasks(task)); + } + + @Test + public void testMinor_msq_noExistingMaxNumTasks_scalesFromDefault() + { + final ClientCompactionTaskQuery task = buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.MSQ, + new HashMap<>() + ); + // Default maxNumTasks is 2; default percent 40% scales to floor of 2. + Assert.assertEquals( + ClientMSQContext.DEFAULT_MAX_NUM_TASKS, + getMaxNumTasks(task) + ); + } + + @Test + public void testFull_msq_doesNotScale() + { + final ClientCompactionTaskQuery task = buildTask( + Eligibility.FULL, + CompactionEngine.MSQ, + contextWithMaxNumTasks(5) + ); + Assert.assertEquals(5, getMaxNumTasks(task)); + } + + @Test + public void testMinor_native_doesNotScale() + { + final ClientCompactionTaskQuery task = buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.NATIVE, + contextWithMaxNumTasks(5) + ); + Assert.assertEquals(5, getMaxNumTasks(task)); + } + + @Test + public void testMinor_msq_outOfRangePercentThrows() + { + final Map ctx = contextWithMaxNumTasks(10); + ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 200); + final DruidException thrown = Assert.assertThrows( + DruidException.class, + () -> buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.MSQ, + ctx + ) + ); + Assert.assertEquals(DruidException.Category.INVALID_INPUT, thrown.getCategory()); + } + + @Test + public void testMinor_msq_stringPercentIsParsed() + { + final Map ctx = contextWithMaxNumTasks(10); + ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, "50"); + final ClientCompactionTaskQuery task = buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.MSQ, + ctx + ); + Assert.assertEquals(5, getMaxNumTasks(task)); + } + + private static ClientCompactionTaskQuery buildTask( + Eligibility eligibility, + CompactionEngine engine, + Map taskContext + ) + { + final DataSegment segment = new DataSegment( + DATA_SOURCE, + Intervals.of("2024-01-01/2024-01-02"), + "v1", + null, + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 1), + 0, + 100L + ); + final List segments = ImmutableList.of(segment); + final CompactionStatus status = CompactionStatus.pending( + CompactionStatistics.create(0L, 0L, 0L, 0L), + CompactionStatistics.create(100L, 0L, 1L, 1L), + segments, + "for test" + ); + final CompactionCandidate candidate = CompactionCandidate.from(segments, null, status); + final DataSourceCompactionConfig config = InlineSchemaDataSourceCompactionConfig + .builder() + .forDataSource(DATA_SOURCE) + .withTaskContext(taskContext) + .withEngine(engine) + .build(); + return CompactSegments.createCompactionTask( + candidate, + eligibility, + config, + engine, + null, + true + ); + } + + private static Map contextWithMaxNumTasks(int maxNumTasks) + { + final Map context = new HashMap<>(); + context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks); + return context; + } + + private static int getMaxNumTasks(ClientCompactionTaskQuery task) + { + return (int) task.getContext().get(ClientMSQContext.CTX_MAX_NUM_TASKS); + } +} diff --git a/website/.spelling b/website/.spelling index 28701817f362..b73b69f873a5 100644 --- a/website/.spelling +++ b/website/.spelling @@ -834,6 +834,7 @@ WorkerRpcFailed TIMED_OUT # MSQ context parameters maxNumTasks +minorCompactionTaskPercent taskAssignment finalizeAggregations indexSpec From e40c45d55b199b8b51ea684f0b93bfdc477a6983 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Tue, 5 May 2026 15:31:49 -0500 Subject: [PATCH 2/5] Default scaling to off --- .../coordinator/duty/CompactSegments.java | 8 ++++-- .../CompactSegmentsMinorTaskScalingTest.java | 25 ++++++++++--------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index a913223eaf68..d547a843a353 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -300,9 +300,13 @@ private int submitCompactionTasks( /** * Default percent applied to {@code maxNumTasks} for MSQ minor compactions * when the supervisor's {@code taskContext} does not specify - * {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT}. + * {@link ClientMSQContext#CTX_MINOR_COMPACTION_TASK_PERCENT}. The default of + * 100 disables scaling, matching the opt-in shape of the minor compaction + * feature itself (which is gated by non-zero + * {@code minUncompactedBytesPercentForFullCompaction} or + * {@code minUncompactedRowsPercentForFullCompaction} on the policy). */ - public static final int DEFAULT_MINOR_COMPACTION_TASK_PERCENT = 40; + public static final int DEFAULT_MINOR_COMPACTION_TASK_PERCENT = 100; /** * Creates a {@link ClientCompactionTaskQuery} which can be submitted to an diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java index 5d1eaad4f238..e72950e7cf1f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java @@ -49,14 +49,14 @@ public class CompactSegmentsMinorTaskScalingTest private static final String DATA_SOURCE = "wiki"; @Test - public void testMinor_msq_appliesDefaultPercentWhenAbsent() + public void testMinor_msq_defaultDoesNotScale() { final ClientCompactionTaskQuery task = buildTask( Eligibility.minor("uncompacted ratio below threshold"), CompactionEngine.MSQ, contextWithMaxNumTasks(10) ); - Assert.assertEquals(4, getMaxNumTasks(task)); + Assert.assertEquals(10, getMaxNumTasks(task)); } @Test @@ -99,18 +99,19 @@ public void testMinor_msq_percentOneHundredIsNoOp() } @Test - public void testMinor_msq_noExistingMaxNumTasks_scalesFromDefault() + public void testMinor_msq_outOfRangePercentBelowOneThrows() { - final ClientCompactionTaskQuery task = buildTask( - Eligibility.minor("uncompacted ratio below threshold"), - CompactionEngine.MSQ, - new HashMap<>() - ); - // Default maxNumTasks is 2; default percent 40% scales to floor of 2. - Assert.assertEquals( - ClientMSQContext.DEFAULT_MAX_NUM_TASKS, - getMaxNumTasks(task) + final Map ctx = contextWithMaxNumTasks(10); + ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 0); + final DruidException thrown = Assert.assertThrows( + DruidException.class, + () -> buildTask( + Eligibility.minor("uncompacted ratio below threshold"), + CompactionEngine.MSQ, + ctx + ) ); + Assert.assertEquals(DruidException.Category.INVALID_INPUT, thrown.getCategory()); } @Test From 745b17f56e568eebc14cd5c76b2cfdf734213462 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Tue, 5 May 2026 15:32:16 -0500 Subject: [PATCH 3/5] Fix docs --- docs/data-management/automatic-compaction.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/data-management/automatic-compaction.md b/docs/data-management/automatic-compaction.md index b41109b12e83..ed58e433f298 100644 --- a/docs/data-management/automatic-compaction.md +++ b/docs/data-management/automatic-compaction.md @@ -165,11 +165,11 @@ You can use [MSQ task engine context parameters](../multi-stage-query/reference. #### Scaling task count for minor compactions -[Minor compactions](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst) typically rewrite a small subset of segments and do less work than full compactions. To avoid spawning a full-sized MSQ task topology for that lighter workload, set `spec.taskContext.minorCompactionTaskPercent` to a value between 1 and 100. The percent is applied to `maxNumTasks` to derive the task count for minor compaction tasks; the result is floored at 2 (the MSQ minimum of one controller and one worker). +[Minor compactions](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst) typically rewrite a small subset of segments and do less work than full compactions. To avoid spawning a full-sized MSQ task topology for that lighter workload, opt in by setting `spec.taskContext.minorCompactionTaskPercent` to a value between 1 and 100. The percent is applied to `maxNumTasks` to derive the task count for minor compaction tasks; the result is floored at 2 (the MSQ minimum of one controller and one worker). | Parameter | Description | Default value | |---|---|---| -| `minorCompactionTaskPercent` | Percent (1-100) used to scale `maxNumTasks` for MSQ minor compactions. Has no effect on full compactions or on the native compaction engine. Set to `100` to disable scaling. | 40 | +| `minorCompactionTaskPercent` | Percent (1-100) used to scale `maxNumTasks` for MSQ minor compactions. Has no effect on full compactions or on the native compaction engine. By default, minor compactions use the same task count as full compactions; set this parameter to opt into scaling. A starting value in the range of 40 to 50 is reasonable for most workloads. | 100 (no scaling) | For example, with `maxNumTasks` set to 5 and `minorCompactionTaskPercent` set to 40, a minor compaction task launches with `maxNumTasks` of 2 (40% of 5, rounded, then floored at 2), while a full compaction task on the same datasource still launches with `maxNumTasks` of 5. From e3ba9637ca6b38089545c315ebd06336869d11e5 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Tue, 5 May 2026 16:52:11 -0500 Subject: [PATCH 4/5] embedded test --- .../compact/CompactionSupervisorTest.java | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index 18e1a7ccb0fc..106aeca1d785 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -19,8 +19,12 @@ package org.apache.druid.testing.embedded.compact; +import com.google.common.collect.ImmutableList; import org.apache.druid.catalog.guice.CatalogClientModule; import org.apache.druid.catalog.guice.CatalogCoordinatorModule; +import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.client.indexing.ClientMSQContext; +import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.StringTuple; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -29,6 +33,7 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; @@ -50,6 +55,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -113,8 +119,10 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -296,6 +304,86 @@ public void test_minorCompactionWithMSQ(MostFragmentedIntervalFirstPolicy policy Assertions.assertEquals(4000, getTotalRowCount()); } + @Test + public void test_minorCompactionWithMSQ_minorCompactionTaskPercentScalesWorkers() + { + final MostFragmentedIntervalFirstPolicy policy = + new MostFragmentedIntervalFirstPolicy(2, new HumanReadableBytes("1KiB"), null, 80, null, null); + configureCompaction(CompactionEngine.MSQ, policy); + + ingest1kRecords(); + ingest1kRecords(); + + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); + + // maxNumTasks=3 (controller + 2 workers) for full; minorCompactionTaskPercent=50 + // scales minor compactions to maxNumTasks=2 (controller + 1 worker). + final InlineSchemaDataSourceCompactionConfig dayConfig = + InlineSchemaDataSourceCompactionConfig + .builder() + .forDataSource(dataSource) + .withSkipOffsetFromLatest(Period.seconds(0)) + .withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.DAY, null, false)) + .withDimensionsSpec(new UserCompactionTaskDimensionsConfig( + WikipediaStreamEventStreamGenerator.dimensions() + .stream() + .map(StringDimensionSchema::new) + .collect(Collectors.toUnmodifiableList()))) + .withTaskContext(buildScalingTaskContext(3, 50)) + .withIoConfig(new UserCompactionTaskIOConfig(true)) + .withTuningConfig( + UserCompactionTaskQueryTuningConfig + .builder() + .partitionsSpec(new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false)) + .build() + ) + .build(); + + // First run: full compaction (no compacted segments yet). + runCompactionWithSpec(dayConfig); + waitForAllCompactionTasksToFinish(); + pauseCompaction(dayConfig); + + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY)); + + // Capture the full task before the minor task arrives (its ID does not contain "-minor"). + final TaskStatusPlus fullTask = findMostRecentCompactionTask(taskId -> !taskId.contains("-minor")); + Assertions.assertNotNull(fullTask, "Expected a full compaction task for datasource[" + dataSource + "]"); + assertCompactionTaskMaxNumTasks(fullTask, 3); + + // Ingest more so the next run sees a mix of compacted + uncompacted segments. + ingest1kRecords(); + ingest1kRecords(); + + overlord.latchableEmitter().waitForNextEvent(event -> event.hasMetricName("segment/metadataCache/sync/time")); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY)); + + final long totalUsed = overlord.latchableEmitter().getMetricValues( + "segment/metadataCache/used/count", + Map.of(DruidMetrics.DATASOURCE, dataSource) + ).stream().reduce((first, second) -> second).orElse(0).longValue(); + + // Second run: minor compaction (uncompacted bytes ratio is below the 80% threshold). + runCompactionWithSpec(dayConfig); + waitForAllCompactionTasksToFinish(); + + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName("segment/metadataCache/used/count") + .hasDimension(DruidMetrics.DATASOURCE, dataSource) + .hasValueMatching(Matchers.greaterThan(totalUsed))); + + Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY)); + + final TaskStatusPlus minorTask = findMostRecentCompactionTask(taskId -> taskId.contains("-minor")); + Assertions.assertNotNull(minorTask, "Expected a minor compaction task for datasource[" + dataSource + "]"); + assertCompactionTaskMaxNumTasks(minorTask, 2); + } + @MethodSource("getEngine") @ParameterizedTest(name = "compactionEngine={0}") public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine compactionEngine) @@ -1208,6 +1296,50 @@ private int getNumSegmentsWith(Granularity granularity) .count(); } + private static Map buildScalingTaskContext(int maxNumTasks, int minorCompactionTaskPercent) + { + final Map context = new HashMap<>(); + context.put("useConcurrentLocks", true); + context.put(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasks); + context.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, minorCompactionTaskPercent); + return context; + } + + /** + * Returns the most recent completed compaction task whose id matches the given filter, + * or null if none is found. + */ + @Nullable + private TaskStatusPlus findMostRecentCompactionTask(Predicate taskIdFilter) + { + final List tasks = ImmutableList.copyOf( + (CloseableIterator) cluster.callApi().onLeaderOverlord( + o -> o.taskStatuses("complete", dataSource, 100) + ) + ); + TaskStatusPlus match = null; + for (TaskStatusPlus task : tasks) { + if ("compact".equals(task.getType()) && taskIdFilter.test(task.getId())) { + match = task; + } + } + return match; + } + + /** + * Asserts that the submitted compaction task carries the expected {@code maxNumTasks} in its context. + */ + private void assertCompactionTaskMaxNumTasks(TaskStatusPlus task, int expectedMaxNumTasks) + { + final TaskPayloadResponse payload = cluster.callApi().onLeaderOverlord(o -> o.taskPayload(task.getId())); + final ClientCompactionTaskQuery query = (ClientCompactionTaskQuery) payload.getPayload(); + Assertions.assertEquals( + expectedMaxNumTasks, + ((Number) query.getContext().get(ClientMSQContext.CTX_MAX_NUM_TASKS)).intValue(), + "maxNumTasks in compaction task[" + task.getId() + "] context" + ); + } + private void runIngestionAtGranularity( String granularity, String inlineDataCsv From 9e91fbd910a98448a8ae0db6da7926384ff99ffd Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Wed, 6 May 2026 09:39:11 -0500 Subject: [PATCH 5/5] Pull validation forward after code review called it out --- .../compact/CascadingReindexingTemplate.java | 3 + .../indexing/ClientCompactionRunnerInfo.java | 32 +++++++++ .../coordinator/duty/CompactSegments.java | 9 ++- .../ClientCompactionRunnerInfoTest.java | 68 +++++++++++++++++++ .../CompactSegmentsMinorTaskScalingTest.java | 37 ++-------- 5 files changed, 115 insertions(+), 34 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java index c50c7ec74025..c7357b53a01a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java @@ -264,6 +264,7 @@ public UserCompactionTaskQueryTuningConfig getTuningConfig() * Range partition dimension type checking passes {@code null} for dimensionSchemas * since those are not known at template level. *
  • maxNumTasks >= 2 in taskContext.
  • + *
  • minorCompactionTaskPercent (if set) is an integer between 1 and 100 in taskContext.
  • * * *

    Standard MSQ checks skipped (not applicable at template level): @@ -293,6 +294,8 @@ public CompactionConfigValidationResult validate(ClusterCompactionConfig cluster results.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(this.getTaskContext())); + results.add(ClientCompactionRunnerInfo.validateMinorCompactionTaskPercentForMSQ(this.getTaskContext())); + return results.stream() .filter(result -> !result.isValid()) .findFirst() diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java index c66def987934..cffffc816ac5 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java @@ -132,6 +132,7 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn )); } validationResults.add(validateMaxNumTasksForMSQ(newConfig.getTaskContext())); + validationResults.add(validateMinorCompactionTaskPercentForMSQ(newConfig.getTaskContext())); validationResults.add(validateMetricsSpecForMSQ(newConfig.getMetricsSpec())); return validationResults.stream() .filter(result -> !result.isValid()) @@ -241,6 +242,37 @@ public static CompactionConfigValidationResult validateMaxNumTasksForMSQ(Map context) + { + if (context == null || !context.containsKey(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT)) { + return CompactionConfigValidationResult.success(); + } + final int percent; + try { + percent = QueryContext.of(context).getInt(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 0); + } + catch (Exception e) { + return CompactionConfigValidationResult.failure( + "MSQ: Context '%s'[%s] must be an integer between 1 and 100", + ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, + context.get(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT) + ); + } + if (percent < 1 || percent > 100) { + return CompactionConfigValidationResult.failure( + "MSQ: Context '%s'[%d] must be between 1 and 100", + ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, + percent + ); + } + return CompactionConfigValidationResult.success(); + } + /** * Validate each metric defines some aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'. */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index d547a843a353..379375bc96a4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -38,7 +38,6 @@ import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.AggregateProjectionSpec; import org.apache.druid.error.DruidException; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -470,6 +469,10 @@ public Map getAutoCompactionSnapshot() * absent. The scaled value is floored at {@link ClientMSQContext#DEFAULT_MAX_NUM_TASKS} * (the MSQ minimum of 1 controller + 1 worker). No-op when the engine is * native, the mode is full, or the percent is 100. + * + *

    The percent is validated at config-acceptance time by + * {@link ClientCompactionRunnerInfo#validateMinorCompactionTaskPercentForMSQ}, + * so an out-of-range value here indicates a bug. */ private static void maybeScaleMaxNumTasksForMinorCompaction( Map context, @@ -487,8 +490,8 @@ private static void maybeScaleMaxNumTasksForMinorCompaction( DEFAULT_MINOR_COMPACTION_TASK_PERCENT ); if (percent < 1 || percent > 100) { - throw InvalidInput.exception( - "'%s'[%d] must be between 1 and 100", + throw DruidException.defensive( + "'%s'[%d] must be between 1 and 100; should have been rejected at config-acceptance time", ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, percent ); diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java index 2730c98899e6..f92d33591543 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java @@ -267,6 +267,74 @@ public void testMSQEngineWithRollupNullWithMetricsSpecIsInvalid() ); } + @Test + public void testMSQEngineWithMinorCompactionTaskPercentZeroIsInvalid() + { + final CompactionConfigValidationResult validationResult = validateMinorCompactionTaskPercent(0); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals( + "MSQ: Context 'minorCompactionTaskPercent'[0] must be between 1 and 100", + validationResult.getReason() + ); + } + + @Test + public void testMSQEngineWithMinorCompactionTaskPercentAboveOneHundredIsInvalid() + { + final CompactionConfigValidationResult validationResult = validateMinorCompactionTaskPercent(200); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals( + "MSQ: Context 'minorCompactionTaskPercent'[200] must be between 1 and 100", + validationResult.getReason() + ); + } + + @Test + public void testMSQEngineWithMinorCompactionTaskPercentNonNumericIsInvalid() + { + final CompactionConfigValidationResult validationResult = validateMinorCompactionTaskPercent("not-a-number"); + Assert.assertFalse(validationResult.isValid()); + Assert.assertTrue( + "Should fail due to the minorCompactionTaskPercent key explicitly", + validationResult.getReason().startsWith("MSQ: Context 'minorCompactionTaskPercent'") + ); + } + + @Test + public void testMSQEngineWithMinorCompactionTaskPercentInRangeIsValid() + { + Assert.assertTrue(validateMinorCompactionTaskPercent(1).isValid()); + Assert.assertTrue(validateMinorCompactionTaskPercent(40).isValid()); + Assert.assertTrue(validateMinorCompactionTaskPercent(100).isValid()); + } + + @Test + public void testMSQEngineWithMinorCompactionTaskPercentAbsentIsValid() + { + final DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig( + new DynamicPartitionsSpec(3, null), + Collections.emptyMap(), + null, + null, + null + ); + Assert.assertTrue( + ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.MSQ).isValid() + ); + } + + private static CompactionConfigValidationResult validateMinorCompactionTaskPercent(Object value) + { + final DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig( + new DynamicPartitionsSpec(3, null), + Map.of(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, value), + null, + null, + null + ); + return ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.MSQ); + } + private static DataSourceCompactionConfig createMSQCompactionConfig( PartitionsSpec partitionsSpec, Map context, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java index e72950e7cf1f..8bb90ddac411 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsMinorTaskScalingTest.java @@ -99,8 +99,12 @@ public void testMinor_msq_percentOneHundredIsNoOp() } @Test - public void testMinor_msq_outOfRangePercentBelowOneThrows() + public void testMinor_msq_outOfRangePercentThrowsDefensive() { + // Bad values are normally rejected at config-acceptance time by + // ClientCompactionRunnerInfo#validateMinorCompactionTaskPercentForMSQ; + // this asserts the defensive guard inside compactSegments still fires + // if a bad value somehow reaches task construction. final Map ctx = contextWithMaxNumTasks(10); ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 0); final DruidException thrown = Assert.assertThrows( @@ -111,7 +115,7 @@ public void testMinor_msq_outOfRangePercentBelowOneThrows() ctx ) ); - Assert.assertEquals(DruidException.Category.INVALID_INPUT, thrown.getCategory()); + Assert.assertEquals(DruidException.Category.DEFENSIVE, thrown.getCategory()); } @Test @@ -136,35 +140,6 @@ public void testMinor_native_doesNotScale() Assert.assertEquals(5, getMaxNumTasks(task)); } - @Test - public void testMinor_msq_outOfRangePercentThrows() - { - final Map ctx = contextWithMaxNumTasks(10); - ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, 200); - final DruidException thrown = Assert.assertThrows( - DruidException.class, - () -> buildTask( - Eligibility.minor("uncompacted ratio below threshold"), - CompactionEngine.MSQ, - ctx - ) - ); - Assert.assertEquals(DruidException.Category.INVALID_INPUT, thrown.getCategory()); - } - - @Test - public void testMinor_msq_stringPercentIsParsed() - { - final Map ctx = contextWithMaxNumTasks(10); - ctx.put(ClientMSQContext.CTX_MINOR_COMPACTION_TASK_PERCENT, "50"); - final ClientCompactionTaskQuery task = buildTask( - Eligibility.minor("uncompacted ratio below threshold"), - CompactionEngine.MSQ, - ctx - ); - Assert.assertEquals(5, getMaxNumTasks(task)); - } - private static ClientCompactionTaskQuery buildTask( Eligibility eligibility, CompactionEngine engine,