diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index 196652420814..4579532e09d7 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -65,6 +65,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka I/O confi |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No|| |`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No|| |`stopTaskCount`|Integer|Limits the number of ingestion tasks Druid can cycle at any given time. If not set, Druid can cycle all tasks at the same time. If set to a value less than `taskCount`, your cluster needs fewer available slots to run the supervisor. You can save costs by scaling down your ingestion tier, but this can lead to slower cycle times and lag. See [`stopTaskCount`](#stoptaskcount) for more information.|No|`taskCount` value| +|`boundedStreamConfig`|Object|Configures the supervisor for bounded (one-time) ingestion with explicit start and end offsets. When set, the supervisor creates tasks that read from `startSequenceNumbers` to `endSequenceNumbers`, then automatically terminates when all data is ingested. The bounded configuration is stored with datasource metadata; if a supervisor is restarted or a new supervisor is created with different offsets for the same datasource, it will fail. To retry with different offsets, use the supervisor reset API to clear metadata or use a different supervisor ID. Useful for backfills and historical reprocessing. See [Bounded stream configuration](#bounded-stream-configuration) for details.|No|null| |`serverPriorityToReplicas`|Object (`Map`)|Map of server priorities to the number of replicas per priority. When set, each task replica is assigned a server priority that corresponds to `druid.server.priority` on the Peon process to enable query isolation for mixed workloads using [query routing strategies](../configuration/index.md#query-routing). If not configured, the `replicas` setting applies and all task replicas are assigned a default priority of 0.

For example, setting `serverPriorityToReplicas` to `{"1": 2, "0": 1}` creates 2 task replicas with `druid.server.priority=1` and 1 task replica with `druid.server.priority=0` per task group. This configuration scales proportionally with `taskCount`. For example, if `taskCount` is set to 5, this results in 15 total tasks - 10 tasks with priority 1 and 5 tasks with priority 0. If both `replicas` and `serverPriorityToReplicas` are set, the sum of replicas in `serverPriorityToReplicas` must equal `replicas`.|No|null| #### Task autoscaler @@ -251,6 +252,57 @@ Before you set `stopTaskCount`, note the following: - The [task autoscaler](#task-autoscaler) ignores `stopTaskCount` when shutting down tasks in response to a task count change. The task autoscaler needs to redistribute partitions across tasks, which requires all tasks to be shut down. - If you set `stopTaskCount` to a value less than `taskCount`, Druid cycles the longest running tasks first, then other tasks up to the value set. +#### Bounded stream configuration + +Use `boundedStreamConfig` to configure one-time ingestion from a specific range of offsets. This is useful for backfilling historical data or reprocessing data with different configurations. + +The `boundedStreamConfig` object contains the following properties: + +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`startSequenceNumbers`|Object|Map of partition IDs to start offsets (inclusive for Kafka, inclusive for Kinesis).|Yes| +|`endSequenceNumbers`|Object|Map of partition IDs to end offsets (exclusive for Kafka, inclusive for Kinesis).|Yes| + +When configured, the supervisor: +1. Creates tasks that start reading from `startSequenceNumbers` +2. Tasks automatically stop when they reach `endSequenceNumbers` +3. Supervisor does not create replacement tasks after tasks complete +4. Supervisor transitions to `COMPLETED` state and terminates when all tasks finish + +**Metadata consistency:** The bounded configuration is stored in datasource metadata along with checkpointed offsets. If you restart the supervisor or create a new supervisor with a different `boundedStreamConfig` for the same datasource, the supervisor will fail with an error. To start a new bounded ingestion with different offsets, either: +- Use the [supervisor reset API](../api-reference/supervisor-api.md#reset-a-supervisor) to clear existing metadata +- Use a different supervisor ID + +**Example (Kafka):** + +```json +{ + "type": "kafka", + "spec": { + "ioConfig": { + "topic": "my-topic", + "inputFormat": { + "type": "json" + }, + "boundedStreamConfig": { + "startSequenceNumbers": { + "0": 1000, + "1": 2000, + "2": 1500 + }, + "endSequenceNumbers": { + "0": 5000, + "1": 6000, + "2": 5500 + } + } + } + } +} +``` + +This configuration ingests data from partition 0 offsets 1000-4999, partition 1 offsets 2000-5999, and partition 2 offsets 1500-5499. + ### Tuning configuration The `tuningConfig` object is optional. If you don't specify the `tuningConfig` object, Druid uses the default configuration settings. diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java new file mode 100644 index 000000000000..7e22d85d9cab --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.StreamIngestResource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for bounded Kafka supervisors (one-time ingestion with explicit start/end offsets). + */ +public class KafkaBoundedSupervisorTest extends StreamIndexTestBase +{ + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + protected StreamIngestResource getStreamIngestResource() + { + return kafkaServer; + } + + @Override + protected EmbeddedDruidCluster createCluster() + { + overlord.addProperty( + "druid.monitoring.monitors", + "[\"org.apache.druid.server.metrics.SupervisorStatsMonitor\"]" + ); + overlord.addProperty("druid.monitoring.emissionPeriod", "PT1s"); + return super.createCluster(); + } + + @Test + public void test_boundedSupervisor_ingestsDataAndCompletes() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + + // Publish records before creating supervisor + final int totalRecords = publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map endOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions"); + + // Create bounded config with start offset 0 and current end offsets + Map startOffsets = new HashMap<>(); + startOffsets.put("0", 0L); + startOffsets.put("1", 0L); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for records to be ingested + waitUntilPublishedRecordsAreIngested(totalRecords); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify row count + verifyRowCount(totalRecords); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + Assertions.assertTrue(status.isHealthy()); + } + + @Test + public void test_boundedSupervisor_withEmptyRange_completesImmediately() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 1); + + // Publish some records + publish1kRecords(topic, false); + + // Get current offset + Map currentOffsets = kafkaServer.getPartitionOffsets(topic); + Long currentOffset = currentOffsets.get("0"); + + // Create bounded config with start == end (empty range) + Map offsets = new HashMap<>(); + offsets.put("0", currentOffset); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, offsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + } + + @Test + public void test_boundedSupervisor_withReversedRange_isUnhealthy() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 1); + + // start > end — invalid range, KafkaIndexTaskIOConfig rejects it when a task is created. + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(Map.of("0", 500L), Map.of("0", 100L)); + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(kafkaServer, topic, boundedConfig); + + cluster.callApi().postSupervisor(supervisor); + waitForSupervisorToBeUnhealthy(supervisor.getId()); + + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertFalse(status.isHealthy()); + Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status.getState()); + } + + private KafkaSupervisorSpec createBoundedKafkaSupervisor( + KafkaResource kafkaServer, + String topic, + BoundedStreamConfig boundedConfig + ) + { + return createKafkaSupervisor(kafkaServer) + .withIoConfig(io -> io + .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null)) + .withBoundedStreamConfig(boundedConfig) + ) + .build(dataSource, topic); + } + + @Test + public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map currentOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 partitions"); + + // Create first bounded config - ingest only the first 100 records from each partition + Map startOffsets1 = new HashMap<>(); + startOffsets1.put("0", 0L); + startOffsets1.put("1", 0L); + + Map endOffsets1 = new HashMap<>(); + endOffsets1.put("0", 100L); + endOffsets1.put("1", 100L); + + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(startOffsets1, endOffsets1); + + // Create first bounded supervisor and run it to completion + final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig1 + ); + + cluster.callApi().postSupervisor(supervisor1); + + // Wait for records to be ingested (approximately 200 records total from both partitions) + waitUntilPublishedRecordsAreIngested(200); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor1.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status1 = cluster.callApi().getSupervisorStatus(supervisor1.getId()); + Assertions.assertEquals("COMPLETED", status1.getState()); + + // Now try to create a second bounded supervisor with different bounded config on the same datasource + Map startOffsets2 = new HashMap<>(); + startOffsets2.put("0", 50L); // Different start offset + startOffsets2.put("1", 50L); + + Map endOffsets2 = new HashMap<>(); + endOffsets2.put("0", 200L); // Different end offset + endOffsets2.put("1", 200L); + + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(startOffsets2, endOffsets2); + + final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig2 + ); + + // Post the second supervisor (it should use the same supervisor ID/datasource) + cluster.callApi().postSupervisor(supervisor2); + + // Wait for the supervisor to process and detect the metadata mismatch + waitForSupervisorToBeUnhealthy(supervisor2.getId()); + + // Verify the supervisor is unhealthy + final SupervisorStatus status2 = cluster.callApi().getSupervisorStatus(supervisor2.getId()); + Assertions.assertFalse(status2.isHealthy(), "Supervisor should be unhealthy after detecting metadata mismatch"); + Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR"); + } + + /** + * A new bounded run whose endOffset is less than the offset committed by a prior + * run must not silently reach COMPLETED. + */ + @Test + public void test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceedsNewEnd() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + publish1kRecords(topic, false); + + // Run 1: ingest up to offset 100 on each partition and complete. + Map startOffsets1 = new HashMap<>(); + startOffsets1.put("0", 0L); + startOffsets1.put("1", 0L); + + Map endOffsets1 = new HashMap<>(); + endOffsets1.put("0", 100L); + endOffsets1.put("1", 150L); + + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(startOffsets1, endOffsets1); + final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor(kafkaServer, topic, boundedConfig1); + + cluster.callApi().postSupervisor(supervisor1); + waitUntilPublishedRecordsAreIngested(250); + waitForSupervisorToComplete(supervisor1.getId()); + + final SupervisorStatus status1 = cluster.callApi().getSupervisorStatus(supervisor1.getId()); + Assertions.assertEquals("COMPLETED", status1.getState()); + + // Run 2: same datasource, endOffset (50) < stale committed offset (100). + // Without the fix the supervisor reaches COMPLETED immediately without running tasks. + // With the fix it detects the config mismatch and becomes UNHEALTHY_SUPERVISOR. + Map startOffsets2 = new HashMap<>(); + startOffsets2.put("0", 0L); + startOffsets2.put("1", 0L); + + Map endOffsets2 = new HashMap<>(); + endOffsets2.put("0", 50L); + endOffsets2.put("1", 50L); + + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(startOffsets2, endOffsets2); + final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor(kafkaServer, topic, boundedConfig2); + + cluster.callApi().postSupervisor(supervisor2); + waitForSupervisorToBeUnhealthy(supervisor2.getId()); + + final SupervisorStatus status2 = cluster.callApi().getSupervisorStatus(supervisor2.getId()); + Assertions.assertFalse(status2.isHealthy(), "Supervisor should be unhealthy after detecting metadata mismatch"); + Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR"); + } + + private void waitForSupervisorToComplete(String supervisorId) + { + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName("supervisor/count") + .hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) + .hasDimension("state", "COMPLETED") + ); + } + + private void waitForSupervisorToBeUnhealthy(String supervisorId) + { + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName("supervisor/count") + .hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) + .hasDimension("state", "UNHEALTHY_SUPERVISOR") + ); + } +} diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java index 821d776947dc..ee826936b2b4 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java @@ -120,6 +120,7 @@ protected KinesisSupervisorSpec createKinesisSupervisor(KinesisResource kinesis, Period.seconds(60), null, null, null, null, null, null, null, null, false, + null, null ), Map.of(), diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java index 2c5cfb67d9c3..9a37c73573a6 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java @@ -87,6 +87,7 @@ private KinesisSupervisorSpec createKinesisSupervisorSpec(String dataSource, Str Period.seconds(5), null, null, null, null, null, null, null, null, false, + null, null ), Map.of(), diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamDataSourceMetadata.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamDataSourceMetadata.java index 5cafa1d7925d..ba8c79747c12 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamDataSourceMetadata.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamDataSourceMetadata.java @@ -25,15 +25,26 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; + +import javax.annotation.Nullable; public class RabbitStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata { @JsonCreator public RabbitStreamDataSourceMetadata( - @JsonProperty("partitions") SeekableStreamSequenceNumbers partitions) + @JsonProperty("partitions") SeekableStreamSequenceNumbers partitions, + @JsonProperty("boundedStreamConfig") @Nullable BoundedStreamConfig boundedStreamConfig) + { + super(partitions, boundedStreamConfig); + } + + // Backward compatibility constructor + public RabbitStreamDataSourceMetadata( + SeekableStreamSequenceNumbers partitions) { - super(partitions); + this(partitions, null); } @Override @@ -52,6 +63,6 @@ public DataSourceMetadata asStartMetadata() protected SeekableStreamDataSourceMetadata createConcreteDataSourceMetaData( SeekableStreamSequenceNumbers seekableStreamSequenceNumbers) { - return new RabbitStreamDataSourceMetadata(seekableStreamSequenceNumbers); + return new RabbitStreamDataSourceMetadata(seekableStreamSequenceNumbers, getBoundedStreamConfig()); } } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java index 721e66f6f3af..580578007a34 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -54,7 +55,8 @@ public RabbitStreamIndexTaskIOConfig( @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("uri") String uri, - @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes, + @JsonProperty("boundedStreamConfig") @Nullable BoundedStreamConfig boundedStreamConfig ) { super( @@ -66,7 +68,8 @@ public RabbitStreamIndexTaskIOConfig( minimumMessageTime, maximumMessageTime, inputFormat, - refreshRejectionPeriodsInMinutes + refreshRejectionPeriodsInMinutes, + boundedStreamConfig ); this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskRunner.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskRunner.java index e34f0d9e6d15..101afd4570dd 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskRunner.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskRunner.java @@ -33,6 +33,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.java.util.emitter.EmittingLogger; import javax.annotation.Nonnull; @@ -93,7 +94,9 @@ protected SeekableStreamEndSequenceNumbers deserializePartitionsFr protected SeekableStreamDataSourceMetadata createDataSourceMetadata( SeekableStreamSequenceNumbers partitions) { - return new RabbitStreamDataSourceMetadata(partitions); + // Include bounded config if this is a bounded task + BoundedStreamConfig boundedConfig = task.getIOConfig().getBoundedStreamConfig(); + return new RabbitStreamDataSourceMetadata(partitions, boundedConfig); } @Override diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index 9be4a77a8d4e..6099105b3374 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -203,7 +203,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( maximumMessageTime, ioConfig.getInputFormat(), rabbitConfig.getUri(), - ioConfig.getTaskDuration().getStandardMinutes() + ioConfig.getTaskDuration().getStandardMinutes(), + rabbitConfig.getBoundedStreamConfig() // Pass through bounded config ); } @@ -362,6 +363,39 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() return false; } + @Override + protected boolean isEndOffsetExclusive() + { + return true; + } + + @Override + protected boolean isOffsetAtOrBeyond(Long current, Long target) + { + // RabbitMQ uses Long sequence numbers (delivery tags) + return current >= target; + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + // RabbitMQ uses String as partition ID, so just return the string as-is + return partitionIdString; + } + + @Override + protected Long createSequenceOffsetFromObject(Object offsetObj) + { + // RabbitMQ uses Long as sequence offset + if (offsetObj instanceof Number) { + return ((Number) offsetObj).longValue(); + } + if (offsetObj instanceof String) { + return Long.parseLong((String) offsetObj); + } + throw new IllegalArgumentException("Cannot convert " + offsetObj.getClass() + " to Long offset"); + } + @Override public LagStats computeLagStats() { diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java index 8aad5b762219..20c4e92988b6 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -66,7 +67,8 @@ public RabbitStreamSupervisorIOConfig( @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, @JsonProperty("stopTaskCount") Integer stopTaskCount, - @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas + @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas, + @Nullable @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig ) { super( @@ -86,7 +88,8 @@ public RabbitStreamSupervisorIOConfig( lateMessageRejectionStartDateTime, new IdleConfig(null, null), stopTaskCount, - serverPriorityToReplicas + serverPriorityToReplicas, + boundedStreamConfig ); this.consumerProperties = consumerProperties; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java index 7783e1487776..b7e72e0c3178 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfigTest.java @@ -21,12 +21,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.indexing.IOConfig; import org.junit.Assert; import org.junit.Test; import java.util.Collections; +import java.util.Map; public class RabbitStreamIndexTaskIOConfigTest { @@ -77,4 +80,29 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(Collections.emptySet(), config.getStartSequenceNumbers().getExclusivePartitions()); } + @Test + public void testRabbitStreamDataSourceMetadataWithBoundedConfig() + { + Map startOffsets = ImmutableMap.of("q0", 0L); + Map endOffsets = ImmutableMap.of("q0", 100L); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + SeekableStreamStartSequenceNumbers partitions = + new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of("q0", 10L), Collections.emptySet()); + + RabbitStreamDataSourceMetadata metadata = new RabbitStreamDataSourceMetadata(partitions, boundedConfig); + Assert.assertNotNull(metadata.getBoundedStreamConfig()); + Assert.assertEquals(boundedConfig, metadata.getBoundedStreamConfig()); + } + + @Test + public void testRabbitStreamDataSourceMetadataWithoutBoundedConfig() + { + SeekableStreamStartSequenceNumbers partitions = + new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of("q0", 10L), Collections.emptySet()); + + RabbitStreamDataSourceMetadata metadata = new RabbitStreamDataSourceMetadata(partitions); + Assert.assertNull(metadata.getBoundedStreamConfig()); + } + } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java index 347152a4bb28..0dca7f084f0a 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java @@ -136,4 +136,80 @@ public void testURIRequired() throws Exception mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); } + @Test + public void testBoundedModeSerdeWithIntegerOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"rabbit\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"uri\": \"rabbitmq-stream://localhost:5552\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"queue-0\": 100, \"queue-1\": 200},\n" + + " \"endSequenceNumbers\": {\"queue-0\": 500, \"queue-1\": 600}\n" + + " }\n" + + "}"; + + RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithStringOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"rabbit\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"uri\": \"rabbitmq-stream://localhost:5552\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"queue-0\": \"100\", \"queue-1\": \"200\"},\n" + + " \"endSequenceNumbers\": {\"queue-0\": \"500\", \"queue-1\": \"600\"}\n" + + " }\n" + + "}"; + + RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithMixedOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"rabbit\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"uri\": \"rabbitmq-stream://localhost:5552\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"queue-0\": 100, \"queue-1\": \"200\"},\n" + + " \"endSequenceNumbers\": {\"queue-0\": 500, \"queue-1\": \"600\"}\n" + + " }\n" + + "}"; + + RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + } + + @Test + public void testUnboundedModeByDefault() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"rabbit\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"uri\": \"rabbitmq-stream://localhost:5552\"\n" + + "}"; + + RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } + } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index 82e0b164471a..82ee7afd0e01 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -42,6 +42,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -66,6 +67,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; public class RabbitStreamSupervisorTest extends EasyMockSupport { @@ -212,6 +214,7 @@ private RabbitStreamSupervisor getSupervisor( earlyMessageRejectionPeriod, // early message rejection null, // latemessagerejectionstartdatetime 1, + null, null ); RabbitStreamIndexTaskClientFactory clientFactory = new RabbitStreamIndexTaskClientFactory(null, @@ -278,6 +281,7 @@ public void testRecordSupplier() null, // early message rejection null, // latemessagerejectionstartdatetime 1, + null, null ); RabbitStreamIndexTaskClientFactory clientFactory = new RabbitStreamIndexTaskClientFactory(null, @@ -421,6 +425,7 @@ public void testCreateTaskIOConfig() null, // early message rejection null, // latemessagerejectionstartdatetime 1, + null, null ) ); @@ -461,4 +466,98 @@ public void test_doesTaskMatchSupervisor() Assert.assertFalse(supervisor.doesTaskMatchSupervisor(differentTaskType)); } + + @Test + public void testBoundedModeCreateTasksWithCorrectOffsets() + { + Map startOffsets = ImmutableMap.of( + "queue-0", 100, + "queue-1", 200 + ); + Map endOffsets = ImmutableMap.of( + "queue-0", 500, + "queue-1", 600 + ); + + final RabbitStreamSupervisorIOConfig rabbitSupervisorIOConfig = new RabbitStreamSupervisorIOConfig( + STREAM, + URI, + INPUT_FORMAT, + 1, + 1, + new Period("PT30S"), + null, + null, + null, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + 1000, + null, + new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(rabbitSupervisorIOConfig.isBounded()); + + final RabbitStreamIndexTaskClientFactory taskClientFactory = new RabbitStreamIndexTaskClientFactory(null, OBJECT_MAPPER); + final RabbitStreamSupervisorSpec spec = new RabbitStreamSupervisorSpec( + null, + null, + dataSchema, + tuningConfig, + rabbitSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + new SupervisorStateManagerConfig() + ); + + supervisor = new RabbitStreamSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + spec, + rowIngestionMetersFactory + ); + + // Test type conversion methods + String queueName = supervisor.createPartitionIdFromString("queue-0"); + Assert.assertEquals("queue-0", queueName); + + Long offset = supervisor.createSequenceOffsetFromObject(100); + Assert.assertEquals(Long.valueOf(100L), offset); + + offset = supervisor.createSequenceOffsetFromObject("200"); + Assert.assertEquals(Long.valueOf(200L), offset); + + // Test isOffsetAtOrBeyond + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(500L, 100L)); + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L)); + Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L)); + } + + @Test + public void testCreateSequenceOffsetFromObject_invalidType() + { + supervisor = getDefaultSupervisor(); + + Exception e = Assert.assertThrows( + IllegalArgumentException.class, + () -> supervisor.createSequenceOffsetFromObject(new Object()) + ); + Assert.assertTrue(e.getMessage().contains("Cannot convert")); + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java index cfa308099489..04a6c2ba9285 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -27,11 +27,13 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.utils.CollectionUtils; import org.apache.kafka.common.TopicPartition; +import javax.annotation.Nullable; import java.util.Comparator; import java.util.Map; @@ -41,7 +43,8 @@ public class KafkaDataSourceMetadata extends SeekableStreamDataSourceMetadata kafkaPartitions + @JsonProperty("partitions") SeekableStreamSequenceNumbers kafkaPartitions, + @JsonProperty("boundedStreamConfig") @Nullable BoundedStreamConfig boundedStreamConfig ) { super(kafkaPartitions == null @@ -60,7 +63,16 @@ public KafkaDataSourceMetadata( ((SeekableStreamEndSequenceNumbers) kafkaPartitions).getTopic(), kafkaPartitions.getPartitionSequenceNumberMap(), ((SeekableStreamEndSequenceNumbers) kafkaPartitions).getPartitionOffsetMap() - )); + ), + boundedStreamConfig); + } + + // Backward compatibility constructor + public KafkaDataSourceMetadata( + SeekableStreamSequenceNumbers kafkaPartitions + ) + { + this(kafkaPartitions, null); } @Override @@ -81,7 +93,7 @@ protected SeekableStreamDataSourceMetadata createConc SeekableStreamSequenceNumbers seekableStreamSequenceNumbers ) { - return new KafkaDataSourceMetadata(seekableStreamSequenceNumbers); + return new KafkaDataSourceMetadata(seekableStreamSequenceNumbers, getBoundedStreamConfig()); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index 07c0f80fbe83..1d36d2996cce 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -64,7 +65,8 @@ public KafkaIndexTaskIOConfig( @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides, @JsonProperty("multiTopic") @Nullable Boolean multiTopic, - @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes, + @JsonProperty("boundedStreamConfig") @Nullable BoundedStreamConfig boundedStreamConfig ) { super( @@ -78,7 +80,8 @@ public KafkaIndexTaskIOConfig( minimumMessageTime, maximumMessageTime, inputFormat, - refreshRejectionPeriodsInMinutes + refreshRejectionPeriodsInMinutes, + boundedStreamConfig ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); @@ -128,7 +131,8 @@ public KafkaIndexTaskIOConfig( inputFormat, configOverrides, KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC, - refreshRejectionPeriodsInMinutes + refreshRejectionPeriodsInMinutes, + null ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java index 3da19165771b..ac4ab0e7a1c5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.utils.CollectionUtils; @@ -174,7 +175,9 @@ protected SeekableStreamDataSourceMetadata createData SeekableStreamSequenceNumbers partitions ) { - return new KafkaDataSourceMetadata(partitions); + // Include bounded config if this is a bounded task + BoundedStreamConfig boundedConfig = task.getIOConfig().getBoundedStreamConfig(); + return new KafkaDataSourceMetadata(partitions, boundedConfig); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e2f62ed8d750..727eb52db272 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -227,7 +227,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getInputFormat(), kafkaIoConfig.getConfigOverrides(), kafkaIoConfig.isMultiTopic(), - ioConfig.getTaskDuration().getStandardMinutes() + ioConfig.getTaskDuration().getStandardMinutes(), + kafkaIoConfig.getBoundedStreamConfig() // Pass through bounded config ); } @@ -390,12 +391,43 @@ protected boolean isShardExpirationMarker(Long seqNum) return false; } + @Override + protected boolean isOffsetAtOrBeyond(Long current, Long target) + { + return current >= target; + } + + @Override + protected KafkaTopicPartition createPartitionIdFromString(String partitionIdString) + { + return KafkaTopicPartition.fromString(partitionIdString); + } + + @Override + protected Long createSequenceOffsetFromObject(Object offsetObj) + { + // Jackson may deserialize numbers as Integer if they fit, but Kafka needs Long + if (offsetObj instanceof Number) { + return ((Number) offsetObj).longValue(); + } + if (offsetObj instanceof String) { + return Long.parseLong((String) offsetObj); + } + throw new IllegalArgumentException("Cannot convert " + offsetObj.getClass() + " to Long offset"); + } + @Override protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { return false; } + @Override + protected boolean isEndOffsetExclusive() + { + return true; + } + @Override public LagStats computeLagStats() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 992ff292694a..7e9fffb7e79c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -78,7 +79,8 @@ public KafkaSupervisorIOConfig( @JsonProperty("idleConfig") IdleConfig idleConfig, @JsonProperty("stopTaskCount") Integer stopTaskCount, @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics, - @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas + @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas, + @Nullable @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig ) { super( @@ -98,7 +100,8 @@ public KafkaSupervisorIOConfig( lateMessageRejectionStartDateTime, idleConfig, stopTaskCount, - serverPriorityToReplicas + serverPriorityToReplicas, + boundedStreamConfig ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java index 0b013b69a715..66fe74c9545f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java @@ -31,6 +31,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.initialization.CoreInjectorBuilder; import org.apache.druid.initialization.DruidModule; import org.apache.druid.utils.CollectionUtils; @@ -916,6 +917,76 @@ private static KafkaDataSourceMetadata endMetadataMultiTopic( return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topicPattern, newOffsets)); } + @Test + public void testEquals_differentSequenceNumbers() + { + // Test equals with different sequence numbers (line 129 false branch) + KafkaDataSourceMetadata metadata1 = startMetadata("foo", ImmutableMap.of(0, 100L)); + KafkaDataSourceMetadata metadata2 = startMetadata("foo", ImmutableMap.of(0, 200L)); + + Assert.assertNotEquals(metadata1, metadata2); + } + + @Test + public void testEquals_sameSequenceNumbers_differentBoundedConfig() + { + // Test equals with same sequence numbers but different boundedStreamConfig (line 130 false branch) + Map boundedStart1 = ImmutableMap.of("0", 0L); + Map boundedEnd1 = ImmutableMap.of("0", 100L); + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(boundedStart1, boundedEnd1); + + Map boundedStart2 = ImmutableMap.of("0", 0L); + Map boundedEnd2 = ImmutableMap.of("0", 200L); + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(boundedStart2, boundedEnd2); + + SeekableStreamStartSequenceNumbers partitions = + new SeekableStreamStartSequenceNumbers<>("foo", ImmutableMap.of(new KafkaTopicPartition(false, "foo", 0), 100L), ImmutableSet.of()); + + KafkaDataSourceMetadata metadata1 = new KafkaDataSourceMetadata(partitions, boundedConfig1); + KafkaDataSourceMetadata metadata2 = new KafkaDataSourceMetadata(partitions, boundedConfig2); + + Assert.assertNotEquals(metadata1, metadata2); + } + + @Test + public void testEquals_sameSequenceNumbers_sameBoundedConfig() + { + Map boundedStart = ImmutableMap.of("0", 0L); + Map boundedEnd = ImmutableMap.of("0", 100L); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(boundedStart, boundedEnd); + + SeekableStreamStartSequenceNumbers partitions = + new SeekableStreamStartSequenceNumbers<>("foo", ImmutableMap.of(new KafkaTopicPartition(false, "foo", 0), 100L), ImmutableSet.of()); + + KafkaDataSourceMetadata metadata1 = new KafkaDataSourceMetadata(partitions, boundedConfig); + KafkaDataSourceMetadata metadata2 = new KafkaDataSourceMetadata(partitions, boundedConfig); + + Assert.assertEquals(metadata1, metadata2); + } + + @Test + public void testEquals_differentSequenceNumbers_differentBoundedConfig() + { + Map boundedStart1 = ImmutableMap.of("0", 0L); + Map boundedEnd1 = ImmutableMap.of("0", 100L); + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(boundedStart1, boundedEnd1); + + Map boundedStart2 = ImmutableMap.of("0", 0L); + Map boundedEnd2 = ImmutableMap.of("0", 200L); + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(boundedStart2, boundedEnd2); + + KafkaDataSourceMetadata metadata1 = new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>("foo", ImmutableMap.of(new KafkaTopicPartition(false, "foo", 0), 100L), ImmutableSet.of()), + boundedConfig1 + ); + KafkaDataSourceMetadata metadata2 = new KafkaDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>("foo", ImmutableMap.of(new KafkaTopicPartition(false, "foo", 0), 200L), ImmutableSet.of()), + boundedConfig2 + ); + + Assert.assertNotEquals(metadata1, metadata2); + } + private static ObjectMapper createObjectMapper() { DruidModule module = new KafkaIndexTaskModule(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java index 2df7c6a7be10..245454d4b2bc 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java @@ -27,9 +27,12 @@ import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.testcontainers.kafka.KafkaContainer; @@ -242,6 +245,41 @@ public Admin newAdminClient() return Admin.create(commonClientProperties()); } + /** + * Returns the current end offsets for all partitions in the specified topic. + * The returned map has partition IDs as String keys and end offsets as Long values. + */ + public Map getPartitionOffsets(String topicName) + { + Map offsets = new HashMap<>(); + + // Add required deserializer config + Map props = new HashMap<>(consumerProperties()); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); + + try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { + // Get all partitions for the topic + List partitions = new ArrayList<>(); + consumer.partitionsFor(topicName).forEach( + partitionInfo -> partitions.add(new TopicPartition(topicName, partitionInfo.partition())) + ); + + // Get end offsets for all partitions + Map endOffsets = consumer.endOffsets(partitions); + + // Convert to String keys + for (Map.Entry entry : endOffsets.entrySet()) { + offsets.put(String.valueOf(entry.getKey().partition()), entry.getValue()); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + + return offsets; + } + @Override public String toString() { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java index 24c1656fc7e6..ac8a9e1d2f7b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java @@ -21,6 +21,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.kafkainput.KafkaInputFormat; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SupervisorIOConfigBuilder; import java.util.Map; @@ -33,6 +34,7 @@ public class KafkaIOConfigBuilder extends SupervisorIOConfigBuilder consumerProperties; + private BoundedStreamConfig boundedStreamConfig; public KafkaIOConfigBuilder withTopic(String topic) { @@ -68,6 +70,12 @@ public KafkaIOConfigBuilder withKafkaInputFormat(InputFormat valueFormat) return this; } + public KafkaIOConfigBuilder withBoundedStreamConfig(BoundedStreamConfig boundedStreamConfig) + { + this.boundedStreamConfig = boundedStreamConfig; + return this; + } + @Override public KafkaSupervisorIOConfig build() { @@ -93,7 +101,8 @@ public KafkaSupervisorIOConfig build() idleConfig, stopTaskCount, null, - null + null, + boundedStreamConfig ); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 264c37f9913c..a6a01d8640d1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.kafka.KafkaRecordSupplier; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig; @@ -343,6 +344,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, false, + null, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); @@ -379,6 +381,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, false, + null, null ); Assert.assertEquals(1, kafkaSupervisorIOConfig.getTaskCount()); @@ -441,6 +444,7 @@ private KafkaSupervisorIOConfig makeIOConfig(Integer taskCount, Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + consumerProperties.put("bootstrap.servers", "localhost:8082"); + + Map startOffsets = new HashMap<>(); + startOffsets.put("0", 100); + startOffsets.put("1", 200); + + Map endOffsets = new HashMap<>(); + endOffsets.put("0", 500); + endOffsets.put("1", 600); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + KafkaSupervisorIOConfig original = new KafkaSupervisorIOConfig( + "test-topic", + null, + null, + 1, + 1, + new Period("PT1H"), + consumerProperties, + null, + LagAggregator.DEFAULT, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + false, + null, + boundedConfig + ); + + String json = mapper.writeValueAsString(original); + KafkaSupervisorIOConfig deserialized = mapper.readValue(json, KafkaSupervisorIOConfig.class); + + Assert.assertTrue(deserialized.isBounded()); + Assert.assertNotNull(deserialized.getBoundedStreamConfig()); + Assert.assertEquals(2, deserialized.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, deserialized.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c629aa46192a..4b247aaf1f76 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -70,6 +70,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; @@ -3748,7 +3749,7 @@ public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() t new KafkaDataSourceMetadata( new SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1, -100L, 2, 200L)) ) - ).times(3); + ).times(4); // All unavailable partitions are collected in a single pass and reset together in one resetInternal() call. // Only partition 1 (-100L) is unavailable (below earliest=0); partition 2 (200L) is valid since // Kafka only checks offset >= earliest. @@ -5478,7 +5479,8 @@ public void testComputeUnassignedServerPriorities_whenMultipleReplicasPerPriorit Map.of( 10, 2, 20, 3 - ) + ), + null ); Assert.assertEquals(5, (int) kafkaSupervisorIOConfig.getReplicas()); @@ -5553,6 +5555,112 @@ public void testComputeUnassignedServerPriorities_whenMultipleReplicasPerPriorit Assert.assertEquals(List.of(20, 20), supervisor.computeUnassignedServerPriorities(taskGroup3, 2)); } + @Test + public void testBoundedModeCreateTasksWithCorrectOffsets() throws JsonProcessingException + { + Map startOffsets = ImmutableMap.of( + topic + ":0", 100, + topic + ":1", 200, + topic + ":2", 300 + ); + Map endOffsets = ImmutableMap.of( + topic + ":0", 500, + topic + ":1", 600, + topic + ":2", 700 + ); + + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + consumerProperties.put("bootstrap.servers", kafkaHost); + + final KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( + topic, + null, + INPUT_FORMAT, + 1, + 1, + new Period("PT1H"), + consumerProperties, + null, + null, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + true, + null, + new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(kafkaSupervisorIOConfig.isBounded()); + + final KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(null, null); + final KafkaSupervisorSpec spec = new KafkaSupervisorSpec( + null, + null, + dataSchema, + KafkaSupervisorTuningConfig.defaultConfig(), + kafkaSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + new SupervisorStateManagerConfig() + ); + + supervisor = new TestableKafkaSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + spec, + rowIngestionMetersFactory + ); + + // Test type conversion methods + KafkaTopicPartition partition0 = supervisor.createPartitionIdFromString(topic + ":0"); + Assert.assertEquals(topic, partition0.topic().get()); + Assert.assertEquals(0, partition0.partition()); + + Long offset = supervisor.createSequenceOffsetFromObject(100); + Assert.assertEquals(Long.valueOf(100L), offset); + + offset = supervisor.createSequenceOffsetFromObject("200"); + Assert.assertEquals(Long.valueOf(200L), offset); + + // Test offset comparison + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(500L, 100L)); + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L)); + Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L)); + } + + @Test + public void testCreateSequenceOffsetFromObject_invalidType() + { + Map startOffsets = ImmutableMap.of("0", 0, "1", 0); + Map endOffsets = ImmutableMap.of("0", 100, "1", 100); + supervisor = getTestableSupervisorWithBoundedConfig(1, 1, "PT1H", new BoundedStreamConfig(startOffsets, endOffsets)); + + Exception e = Assert.assertThrows( + IllegalArgumentException.class, + () -> supervisor.createSequenceOffsetFromObject(new Object()) + ); + Assert.assertTrue(e.getMessage().contains("Cannot convert")); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { // create topic manually @@ -5763,7 +5871,8 @@ private TestableKafkaSupervisor getTestableSupervisor( idleConfig, null, true, - serverPriorityToReplicas + serverPriorityToReplicas, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -5858,6 +5967,7 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( null, null, false, + null, null ); @@ -5953,6 +6063,7 @@ private KafkaSupervisor createSupervisor( null, null, false, + null, null ); @@ -6313,4 +6424,304 @@ public boolean isTaskCurrent(int taskGroupId, String taskId, Map t return isTaskCurrentReturn; } } + + @Test + public void testBoundedStreamConfig_tasksIncludeBoundedConfig() throws Exception + { + Map startOffsets = ImmutableMap.of("0", 10L, "1", 20L, "2", 30L); + Map endOffsets = ImmutableMap.of("0", 100L, "1", 100L, "2", 100L); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + supervisor = getTestableSupervisorWithBoundedConfig(1, 1, "PT1H", boundedConfig); + addSomeEvents(100); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata(null) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + // Task should be created with bounded config + KafkaIndexTask task = captured.getValue(); + Assert.assertNotNull(task); + Assert.assertEquals(boundedConfig, task.getIOConfig().getBoundedStreamConfig()); + + // Start offsets should come from bounded config + KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); + Assert.assertEquals(10L, (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0))); + Assert.assertEquals(20L, (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1))); + Assert.assertEquals(30L, (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2))); + + // End offsets should match bounded config + Assert.assertEquals(100L, (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0))); + } + + @Test + public void testBoundedStreamConfig_withCheckpoint_resumesFromCheckpoint() throws Exception + { + Map startOffsets = ImmutableMap.of("0", 0L, "1", 0L, "2", 0L); + Map endOffsets = ImmutableMap.of("0", 100L, "1", 100L, "2", 100L); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + supervisor = getTestableSupervisorWithBoundedConfig(1, 1, "PT1H", boundedConfig); + addSomeEvents(100); + + // Simulate existing checkpoint from previous run with same bounded config + SeekableStreamStartSequenceNumbers checkpointSequences = + new SeekableStreamStartSequenceNumbers<>( + topic, + ImmutableMap.of( + new KafkaTopicPartition(false, topic, 0), 50L, + new KafkaTopicPartition(false, topic, 1), 60L, + new KafkaTopicPartition(false, topic, 2), 70L + ), + ImmutableSet.of() + ); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata(checkpointSequences, boundedConfig) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + // Task should be created with bounded config + KafkaIndexTask task = captured.getValue(); + Assert.assertNotNull(task); + Assert.assertEquals(boundedConfig, task.getIOConfig().getBoundedStreamConfig()); + + // Start offsets should resume from checkpoint + KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); + Assert.assertEquals(50L, (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0))); + Assert.assertEquals(60L, (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1))); + Assert.assertEquals(70L, (long) taskConfig.getStartSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2))); + + // End offsets should still match bounded config + Assert.assertEquals(100L, (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0))); + } + + @Test + public void testBoundedStreamConfig_endOffsetsSetCorrectly() throws Exception + { + Map startOffsets = ImmutableMap.of("0", 0L, "1", 0L, "2", 0L); + Map endOffsets = ImmutableMap.of("0", 150L, "1", 250L, "2", 350L); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + supervisor = getTestableSupervisorWithBoundedConfig(1, 1, "PT1H", boundedConfig); + addSomeEvents(350); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata(null) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + // Task end offsets should match bounded config (not Long.MAX_VALUE) + KafkaIndexTask task = captured.getValue(); + KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); + + Assert.assertEquals(150L, (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 0))); + Assert.assertEquals(250L, (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 1))); + Assert.assertEquals(350L, (long) taskConfig.getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + .get(new KafkaTopicPartition(false, topic, 2))); + } + + @Test + public void testBoundedStreamConfig_allPartitionsEmptyRange_completesImmediately() throws Exception + { + // All partitions have start == end (nothing to process) + Map startOffsets = ImmutableMap.of("0", 100L, "1", 100L, "2", 100L); + Map endOffsets = ImmutableMap.of("0", 100L, "1", 100L, "2", 100L); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + supervisor = getTestableSupervisorWithBoundedConfig(1, 1, "PT1H", boundedConfig); + addSomeEvents(100); + + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata(null) + ).anyTimes(); + // registerListener may or may not be called depending on when completion is detected + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + + // With all partitions having empty ranges, supervisor should detect completion + // State should transition to COMPLETED + SupervisorReport report = supervisor.getStatus(); + Assert.assertEquals( + SupervisorStateManager.BasicState.COMPLETED, + report.getPayload().getDetailedState() + ); + } + + private TestableKafkaSupervisor getTestableSupervisorWithBoundedConfig( + int replicas, + int taskCount, + String duration, + BoundedStreamConfig boundedConfig + ) + { + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + consumerProperties.put("myCustomKey", "myCustomValue"); + consumerProperties.put("bootstrap.servers", kafkaHost); + KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( + topic, + null, + INPUT_FORMAT, + replicas, + taskCount, + new Period(duration), + consumerProperties, + null, + null, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + true, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + true, + null, + boundedConfig + ); + + KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( + null, + null + ) + { + @Override + public SeekableStreamIndexTaskClient build( + String dataSource, + TaskInfoProvider taskInfoProvider, + SeekableStreamSupervisorTuningConfig tuningConfig, + ScheduledExecutorService connectExec + ) + { + Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), tuningConfig.getHttpTimeout()); + Assert.assertEquals(TEST_CHAT_RETRIES, (long) tuningConfig.getChatRetries()); + return taskClient; + } + }; + + final KafkaSupervisorTuningConfig tuningConfig = tuningConfigBuilder() + .withMaxSavedParseExceptions(10) + .build(); + + return new TestableKafkaSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new KafkaSupervisorSpec( + null, + null, + dataSchema, + tuningConfig, + kafkaSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + supervisorConfig + ), + rowIngestionMetersFactory + ); + } + + @Test + public void testBoundedMode_equalOffsetsIsEmpty() + { + // Kafka has exclusive end offsets, so start == end represents an EMPTY range + Map startOffsets = ImmutableMap.of("0", 100, "1", 100); + Map endOffsets = ImmutableMap.of("0", 100, "1", 100); + supervisor = getTestableSupervisorWithBoundedConfig(1, 1, "PT1H", new BoundedStreamConfig(startOffsets, endOffsets)); + + // Kafka uses exclusive end offsets + Assert.assertTrue("Kafka should have exclusive end offsets", supervisor.isEndOffsetExclusive()); + + // Verify that start == end is treated correctly based on offset semantics + // For exclusive offsets: start == end means ZERO records (empty) + Long start = 100L; + Long end = 100L; + + // start >= end is true (they're equal) + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(start, end)); + + // For Kafka (exclusive), this IS an empty range + // The empty range check should be: isOffsetAtOrBeyond(start, end) + // Which evaluates to: true (IS empty) + boolean shouldBeEmpty = supervisor.isOffsetAtOrBeyond(start, end); + + Assert.assertTrue( + "For Kafka with exclusive end offsets, start == end should be considered an empty range", + shouldBeEmpty + ); + } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadata.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadata.java index d3c1630cf909..5cd9b2943fbe 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadata.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadata.java @@ -25,15 +25,27 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; + +import javax.annotation.Nullable; public class KinesisDataSourceMetadata extends SeekableStreamDataSourceMetadata { @JsonCreator public KinesisDataSourceMetadata( - @JsonProperty("partitions") SeekableStreamSequenceNumbers kinesisPartitions + @JsonProperty("partitions") SeekableStreamSequenceNumbers kinesisPartitions, + @JsonProperty("boundedStreamConfig") @Nullable BoundedStreamConfig boundedStreamConfig + ) + { + super(kinesisPartitions, boundedStreamConfig); + } + + // Backward compatibility constructor + public KinesisDataSourceMetadata( + SeekableStreamSequenceNumbers kinesisPartitions ) { - super(kinesisPartitions); + this(kinesisPartitions, null); } @Override @@ -54,6 +66,6 @@ protected KinesisDataSourceMetadata createConcreteDataSourceMetaData( SeekableStreamSequenceNumbers seekableStreamSequenceNumbers ) { - return new KinesisDataSourceMetadata(seekableStreamSequenceNumbers); + return new KinesisDataSourceMetadata(seekableStreamSequenceNumbers, getBoundedStreamConfig()); } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 2df59bbca35f..d5b29f2e41ed 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -79,7 +80,8 @@ public KinesisIndexTaskIOConfig( @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, @JsonProperty("awsExternalId") String awsExternalId, - @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes, + @JsonProperty("boundedStreamConfig") @Nullable BoundedStreamConfig boundedStreamConfig ) { super( @@ -91,7 +93,8 @@ public KinesisIndexTaskIOConfig( minimumMessageTime, maximumMessageTime, inputFormat, - refreshRejectionPeriodsInMinutes + refreshRejectionPeriodsInMinutes, + boundedStreamConfig ); Preconditions.checkArgument( getEndSequenceNumbers().getPartitionSequenceNumberMap() @@ -139,7 +142,8 @@ public KinesisIndexTaskIOConfig( fetchDelayMillis, awsAssumedRoleArn, awsExternalId, - refreshRejectionPeriodsInMinutes + refreshRejectionPeriodsInMinutes, + null // boundedStreamConfig ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 3e2f656329f9..8c3bf5c57fa9 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -33,6 +33,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -97,7 +98,9 @@ protected SeekableStreamDataSourceMetadata createDataSourceMetad SeekableStreamSequenceNumbers partitions ) { - return new KinesisDataSourceMetadata(partitions); + // Include bounded config if this is a bounded task + BoundedStreamConfig boundedConfig = task.getIOConfig().getBoundedStreamConfig(); + return new KinesisDataSourceMetadata(partitions, boundedConfig); } @Override diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 08491caa8ff5..0f91fc0965db 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -133,6 +133,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( return new KinesisIndexTaskIOConfig( groupId, baseSequenceName, + null, + null, + null, new SeekableStreamStartSequenceNumbers<>( ioConfig.getStream(), startPartitions, @@ -147,7 +150,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getFetchDelayMillis(), ioConfig.getAwsAssumedRoleArn(), ioConfig.getAwsExternalId(), - ioConfig.getTaskDuration().getStandardMinutes() + ioConfig.getTaskDuration().getStandardMinutes(), + ioConfig.getBoundedStreamConfig() // Pass through bounded config ); } @@ -381,12 +385,38 @@ protected boolean isShardExpirationMarker(String seqNum) return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum); } + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + return KinesisSequenceNumber.of(current).compareTo(KinesisSequenceNumber.of(target)) >= 0; + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + // Kinesis uses String as partition ID, so just return the string as-is + return partitionIdString; + } + + @Override + protected String createSequenceOffsetFromObject(Object offsetObj) + { + // Kinesis uses String as sequence offset + return offsetObj.toString(); + } + @Override protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { return true; } + @Override + protected boolean isEndOffsetExclusive() + { + return false; + } + // Unlike the Kafka Indexing Service, // Kinesis reports lag metrics measured in time difference in milliseconds between the current sequence number and latest sequence number, // rather than message count. diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index 6c325bd0744d..e20d3a261cfb 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig; import org.apache.druid.indexing.kinesis.KinesisRegion; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -79,7 +80,8 @@ public KinesisSupervisorIOConfig( @JsonProperty("awsExternalId") String awsExternalId, @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig, @JsonProperty("deaggregate") @Deprecated boolean deaggregate, - @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas + @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas, + @Nullable @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig ) { super( @@ -99,7 +101,8 @@ public KinesisSupervisorIOConfig( lateMessageRejectionStartDateTime, new IdleConfig(null, null), null, - serverPriorityToReplicas + serverPriorityToReplicas, + boundedStreamConfig ); this.endpoint = endpoint != null diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 4c9f4a1c8e2c..6f4410f3a8be 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -140,6 +140,7 @@ public void testSample() throws InterruptedException null, null, false, + null, null ), null, @@ -195,6 +196,7 @@ public void testGetInputSourceResources() null, null, false, + null, null ), null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java index aa922b008a55..a93bdf6871bc 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java @@ -132,4 +132,76 @@ public void testTopicRequired() throws Exception mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); } + @Test + public void testBoundedModeSerdeWithStringOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kinesis\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"shardId-000000000000\": \"49590338271490256608559692538361571095921575989136588898\", \"shardId-000000000001\": \"49590338271512257353759162668991891722121171891717232706\"},\n" + + " \"endSequenceNumbers\": {\"shardId-000000000000\": \"49590338271534258098958632799622211348320767794297876514\", \"shardId-000000000001\": \"49590338271556258844158102930252531974520363696878520322\"}\n" + + " }\n" + + "}"; + + KinesisSupervisorIOConfig config = mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithNumericOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kinesis\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"shardId-000000000000\": 100, \"shardId-000000000001\": 200},\n" + + " \"endSequenceNumbers\": {\"shardId-000000000000\": 500, \"shardId-000000000001\": 600}\n" + + " }\n" + + "}"; + + KinesisSupervisorIOConfig config = mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithMixedOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kinesis\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"shardId-000000000000\": \"49590338271490256608559692538361571095921575989136588898\", \"shardId-000000000001\": 200},\n" + + " \"endSequenceNumbers\": {\"shardId-000000000000\": 500, \"shardId-000000000001\": \"49590338271556258844158102930252531974520363696878520322\"}\n" + + " }\n" + + "}"; + + KinesisSupervisorIOConfig config = mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + } + + @Test + public void testUnboundedModeByDefault() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kinesis\",\n" + + " \"stream\": \"my-stream\"\n" + + "}"; + + KinesisSupervisorIOConfig config = mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } + } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index cf91c37e3751..3f6da8aa0532 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -65,6 +65,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; @@ -468,6 +469,7 @@ public void testRecordSupplier() null, null, false, + null, null ); KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER); @@ -534,6 +536,7 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() null, null, false, + null, null ); @@ -562,6 +565,7 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() null, OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), false, + null, null ); @@ -2828,7 +2832,7 @@ public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() t new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of(SHARD_ID1, "100", SHARD_ID2, "200")) ) ) - .times(2); + .times(3); // Since shard 2 was in metadata before but is not in the list of shards returned by the record supplier, // it gets deleted from metadata (it is an expired shard) @@ -4216,6 +4220,7 @@ public void testCorrectInputSources() null, null, false, + null, null ), null, @@ -4725,6 +4730,198 @@ public void test_doesTaskMatchSupervisor() Assert.assertFalse(supervisor.doesTaskMatchSupervisor(differentTaskType)); } + @Test + public void testBoundedModeCreateTasksWithCorrectOffsets() + { + Map startOffsets = ImmutableMap.of( + "shardId-000000000000", "49590338271490256608559692538361571095921575989136588898", + "shardId-000000000001", "49590338271512257353759162668991891722121171891717232706" + ); + Map endOffsets = ImmutableMap.of( + "shardId-000000000000", "49590338271534258098958632799622211348320767794297876514", + "shardId-000000000001", "49590338271556258844158102930252531974520363696878520322" + ); + final KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30S"), + null, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + 0, + null, + null, + null, + true, + null, + new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(kinesisSupervisorIOConfig.isBounded()); + + final KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(null, null); + final KinesisSupervisorSpec spec = new KinesisSupervisorSpec( + null, + null, + dataSchema, + KinesisSupervisorTuningConfig.defaultConfig(), + kinesisSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + null, + new SupervisorStateManagerConfig() + ); + + supervisor = new TestableKinesisSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + spec, + rowIngestionMetersFactory + ); + + // Test type conversion methods + String shardId = supervisor.createPartitionIdFromString("shardId-000000000000"); + Assert.assertEquals("shardId-000000000000", shardId); + + String offset = supervisor.createSequenceOffsetFromObject("49590338271490256608559692538361571095921575989136588898"); + Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", offset); + + offset = supervisor.createSequenceOffsetFromObject(100); + Assert.assertEquals("100", offset); + + Assert.assertTrue(supervisor.isOffsetAtOrBeyond( + "49590338271512257353759162668991891722121171891717232706", + "49590338271490256608559692538361571095921575989136588898" + )); + Assert.assertTrue(supervisor.isOffsetAtOrBeyond( + "49590338271490256608559692538361571095921575989136588898", + "49590338271490256608559692538361571095921575989136588898" + )); + Assert.assertFalse(supervisor.isOffsetAtOrBeyond( + "49590338271490256608559692538361571095921575989136588898", + "49590338271512257353759162668991891722121171891717232706" + )); + } + + @Test + public void testIsOffsetAtOrBeyond_specialMarkerIsAtOrBeyondNumeric() + { + supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); + + // Special markers like EOS are treated as max sequence numbers by KinesisSequenceNumber, + // so they are considered at or beyond any numeric offset. + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(KinesisSequenceNumber.END_OF_SHARD_MARKER, "12345")); + } + + @Test + public void testBoundedMode_singleRecordRange_notEmpty() + { + // Kinesis has inclusive end offsets, so start == end represents ONE record, not an empty range + String singleOffset = "49590338271490256608559692538361571095921575989136588898"; + Map startOffsets = ImmutableMap.of(SHARD_ID0, singleOffset); + Map endOffsets = ImmutableMap.of(SHARD_ID0, singleOffset); + + final KinesisSupervisorIOConfig ioConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT1H"), + null, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + 0, + null, + null, + null, + true, + null, + new BoundedStreamConfig(startOffsets, endOffsets) + ); + + final KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(null, null); + final KinesisSupervisorSpec spec = new KinesisSupervisorSpec( + null, + null, + dataSchema, + KinesisSupervisorTuningConfig.defaultConfig(), + ioConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + null, + new SupervisorStateManagerConfig() + ); + + supervisor = new TestableKinesisSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + spec, + rowIngestionMetersFactory + ); + + // Kinesis uses inclusive end offsets + Assert.assertFalse("Kinesis should have inclusive end offsets", supervisor.isEndOffsetExclusive()); + + // Verify that start == end is treated correctly based on offset semantics + // For inclusive offsets: start == end means ONE record (not empty) + // For exclusive offsets: start == end means ZERO records (empty) + String start = singleOffset; + String end = singleOffset; + + // start >= end is true (they're equal) + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(start, end)); + + // But for Kinesis (inclusive), this is NOT an empty range + // The empty range check should be: isOffsetAtOrBeyond(start, end) && !start.equals(end) + // Which evaluates to: true && false = false (NOT empty) + boolean isAtOrBeyond = supervisor.isOffsetAtOrBeyond(start, end); + boolean isEqual = start.equals(end); + boolean shouldBeEmpty = isAtOrBeyond && !isEqual; + + Assert.assertFalse( + "For Kinesis with inclusive end offsets, start == end should NOT be considered an empty range", + shouldBeEmpty + ); + } + private List testShardMergePhaseOne() throws Exception { supervisorRecordSupplier.assign(EasyMock.anyObject()); @@ -5181,6 +5378,7 @@ private TestableKinesisSupervisor getTestableSupervisor( null, null, false, + null, null ); @@ -5324,6 +5522,7 @@ private TestableKinesisSupervisor getTestableSupervisor( null, autoScalerConfig, false, + null, null ); @@ -5411,6 +5610,7 @@ private TestableKinesisSupervisor getTestableSupervisorCustomIsTaskCurrent( null, null, false, + null, null ); @@ -5500,6 +5700,7 @@ private KinesisSupervisor createSupervisor( null, null, false, + null, null ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java index 4ea1e992da0d..0b42da98c9ec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamDataSourceMetadata.java @@ -21,8 +21,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.java.util.common.IAE; +import javax.annotation.Nullable; import java.util.Objects; public abstract class SeekableStreamDataSourceMetadata @@ -30,11 +32,23 @@ public abstract class SeekableStreamDataSourceMetadata seekableStreamSequenceNumbers; + @Nullable + private final BoundedStreamConfig boundedStreamConfig; + public SeekableStreamDataSourceMetadata( SeekableStreamSequenceNumbers seekableStreamSequenceNumbers ) + { + this(seekableStreamSequenceNumbers, null); + } + + public SeekableStreamDataSourceMetadata( + SeekableStreamSequenceNumbers seekableStreamSequenceNumbers, + @Nullable BoundedStreamConfig boundedStreamConfig + ) { this.seekableStreamSequenceNumbers = seekableStreamSequenceNumbers; + this.boundedStreamConfig = boundedStreamConfig; } @JsonProperty("partitions") @@ -43,6 +57,13 @@ public SeekableStreamSequenceNumbers getSee return seekableStreamSequenceNumbers; } + @Nullable + @JsonProperty("boundedStreamConfig") + public BoundedStreamConfig getBoundedStreamConfig() + { + return boundedStreamConfig; + } + @Override public boolean isValidStart() { @@ -105,13 +126,14 @@ public boolean equals(Object o) return false; } SeekableStreamDataSourceMetadata that = (SeekableStreamDataSourceMetadata) o; - return Objects.equals(getSeekableStreamSequenceNumbers(), that.getSeekableStreamSequenceNumbers()); + return Objects.equals(getSeekableStreamSequenceNumbers(), that.getSeekableStreamSequenceNumbers()) && + Objects.equals(boundedStreamConfig, that.boundedStreamConfig); } @Override public int hashCode() { - return seekableStreamSequenceNumbers.hashCode(); + return Objects.hash(seekableStreamSequenceNumbers, boundedStreamConfig); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index 8f92f76e8360..cf79c7ab4818 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.common.config.Configs; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.segment.indexing.IOConfig; import org.joda.time.DateTime; @@ -42,6 +43,7 @@ public abstract class SeekableStreamIndexTaskIOConfig startSequenceNumbers; // Partition -> Start Offset + private final Map endSequenceNumbers; // Partition -> End Offset + + @JsonCreator + public BoundedStreamConfig( + @JsonProperty("startSequenceNumbers") Map startSequenceNumbers, + @JsonProperty("endSequenceNumbers") Map endSequenceNumbers + ) + { + if (startSequenceNumbers == null || startSequenceNumbers.isEmpty() || + endSequenceNumbers == null || endSequenceNumbers.isEmpty()) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("startSequenceNumbers and endSequenceNumbers cannot be null or empty"); + } + + if (!startSequenceNumbers.keySet().equals(endSequenceNumbers.keySet())) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "startSequenceNumbers and endSequenceNumbers must have matching partition sets. Start: %s, End: %s", + startSequenceNumbers.keySet(), + endSequenceNumbers.keySet() + ); + } + + this.startSequenceNumbers = startSequenceNumbers; + this.endSequenceNumbers = endSequenceNumbers; + } + + @JsonProperty + public Map getStartSequenceNumbers() + { + return startSequenceNumbers; + } + + @JsonProperty + public Map getEndSequenceNumbers() + { + return endSequenceNumbers; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BoundedStreamConfig that = (BoundedStreamConfig) o; + return startSequenceNumbers.equals(that.startSequenceNumbers) && + endSequenceNumbers.equals(that.endSequenceNumbers); + } + + @Override + public int hashCode() + { + int result = startSequenceNumbers.hashCode(); + result = 31 * result + endSequenceNumbers.hashCode(); + return result; + } + + @Override + public String toString() + { + return "BoundedStreamConfig{" + + "startSequenceNumbers=" + startSequenceNumbers + + ", endSequenceNumbers=" + endSequenceNumbers + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 6a3207e6baa0..91b4244c0bf3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -207,6 +207,10 @@ public class TaskGroup // task groups have nothing but closed partitions in their assignments. final ImmutableMap unfilteredStartingSequencesForSequenceName; + // End sequences for bounded mode - null for streaming mode + @Nullable + final ImmutableMap endSequences; + final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); final ConcurrentHashMap taskIdToServerPriority = new ConcurrentHashMap<>(); final DateTime minimumMessageTime; @@ -231,6 +235,37 @@ public class TaskGroup groupId, startingSequences, unfilteredStartingSequencesForSequenceName, + null, // endSequences - null for streaming mode + minimumMessageTime, + maximumMessageTime, + exclusiveStartSequenceNumberPartitions, + generateSequenceName( + unfilteredStartingSequencesForSequenceName == null + ? startingSequences + : unfilteredStartingSequencesForSequenceName, + minimumMessageTime, + maximumMessageTime, + spec.getDataSchema(), + taskTuningConfig + ) + ); + } + + TaskGroup( + int groupId, + ImmutableMap startingSequences, + @Nullable ImmutableMap unfilteredStartingSequencesForSequenceName, + @Nullable ImmutableMap endSequences, + @Nullable DateTime minimumMessageTime, + @Nullable DateTime maximumMessageTime, + @Nullable Set exclusiveStartSequenceNumberPartitions + ) + { + this( + groupId, + startingSequences, + unfilteredStartingSequencesForSequenceName, + endSequences, minimumMessageTime, maximumMessageTime, exclusiveStartSequenceNumberPartitions, @@ -250,6 +285,7 @@ public class TaskGroup int groupId, ImmutableMap startingSequences, @Nullable ImmutableMap unfilteredStartingSequencesForSequenceName, + @Nullable ImmutableMap endSequences, DateTime minimumMessageTime, DateTime maximumMessageTime, Set exclusiveStartSequenceNumberPartitions, @@ -261,6 +297,7 @@ public class TaskGroup this.unfilteredStartingSequencesForSequenceName = unfilteredStartingSequencesForSequenceName == null ? startingSequences : unfilteredStartingSequencesForSequenceName; + this.endSequences = endSequences; this.minimumMessageTime = minimumMessageTime; this.maximumMessageTime = maximumMessageTime; this.checkpointSequences.put(0, startingSequences); @@ -1386,6 +1423,11 @@ public void tryInit() try { recordSupplier = setupRecordSupplier(); + // Initialize bounded partitions BEFORE first run + if (ioConfig.isBounded()) { + initializeBoundedPartitionGroups(); + } + exec.submit( () -> { try { @@ -1856,6 +1898,10 @@ public TaskGroup addTaskGroupToPendingCompletionTaskGroup( public void runInternal() { try { + if (isBoundedWorkComplete()) { + handleBoundedCompletion(); + return; + } possiblyRegisterListener(); stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM); @@ -1990,6 +2036,10 @@ public void resetInternal(DataSourceMetadata dataSourceMetadata) activelyReadingTaskGroups.clear(); partitionGroups.clear(); partitionOffsets.clear(); + if (ioConfig.isBounded()) { + initializeBoundedPartitionGroups(); + stateManager.maybeSetState(SupervisorStateManager.BasicState.RUNNING); + } } else { if (!checkSourceMetadataMatch(dataSourceMetadata)) { throw new IAE( @@ -2093,9 +2143,9 @@ public void resetInternal(DataSourceMetadata dataSourceMetadata) /** * Reset offsets with the data source metadata. If checkpoints exist, the resulting stored offsets will be a union of * existing checkpointed offsets and provided offsets; any checkpointed offsets not specified in the metadata will be - * preserved as-is. If checkpoints don't exist, the provided reset datasource metdadata will be inserted into + * preserved as-is. If checkpoints don't exist, the provided reset datasource metadata will be inserted into * the metadata storage. Once the offsets are reset, any active tasks serving the partition offsets will be restarted. - * @param dataSourceMetadata Required reset data source metdata. Assumed that the metadata is validated. + * @param dataSourceMetadata Required reset data source metadata. Assumed that the metadata is validated. */ public void resetOffsetsInternal(@Nonnull final DataSourceMetadata dataSourceMetadata) { @@ -2387,6 +2437,17 @@ public Boolean apply(Pair endSequences = null; + if (seekableStreamIndexTask.getIOConfig().getEndSequenceNumbers() != null) { + endSequences = ImmutableMap.copyOf( + seekableStreamIndexTask.getIOConfig() + .getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + ); + } + return new TaskGroup( taskGroupId, ImmutableMap.copyOf( @@ -2395,6 +2456,7 @@ public Boolean apply(Pair previousPartitionIds = new ArrayList<>(partitionIds); Set partitionIdsFromSupplier; recordSupplierLock.lock(); @@ -3210,6 +3279,59 @@ protected Map getLatestSequencesFromStream( return new HashMap<>(); } + /** + * Converts a map with string keys from BoundedStreamConfig to a properly typed map. + * The BoundedStreamConfig uses Map which Jackson deserializes as Map. + * This method converts string keys to partition IDs and offset values to the appropriate types. + * + * @param rawMap the raw map from BoundedStreamConfig + * @return a map with properly typed partition IDs and sequence offsets + */ + private Map convertBoundedConfigMap(Map rawMap) + { + Map result = new HashMap<>(); + for (Map.Entry entry : rawMap.entrySet()) { + PartitionIdType partition = createPartitionIdFromString(entry.getKey().toString()); + SequenceOffsetType offset = createSequenceOffsetFromObject(entry.getValue()); + result.put(partition, offset); + } + return result; + } + + /** + * Initialize partitionGroups from bounded config instead of from stream discovery. + * This prevents the supervisor from trying to recreate tasks as they complete. + * Only called when in bounded mode during supervisor startup. + */ + private void initializeBoundedPartitionGroups() + { + if (!ioConfig.isBounded()) { + return; + } + + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map configuredPartitions = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + + for (PartitionIdType partition : configuredPartitions.keySet()) { + int taskGroupId = getTaskGroupIdForPartition(partition); + + partitionGroups.computeIfAbsent(taskGroupId, k -> new HashSet<>()).add(partition); + partitionIds.add(partition); + partitionOffsets.put(partition, getNotSetMarker()); + + log.info("Bounded mode: initialized partition[%s] in taskGroup[%d]", partition, taskGroupId); + } + + assignRecordSupplierToPartitionIds(); + + log.info( + "Bounded mode: initialized [%d] partitions in [%d] task groups", + configuredPartitions.size(), + partitionGroups.size() + ); + } + private void assignRecordSupplierToPartitionIds() { recordSupplierLock.lock(); @@ -4107,6 +4229,16 @@ private void createNewTasks() throws JsonProcessingException final Map newTaskGroups = new HashMap<>(); for (Integer groupId : partitionGroups.keySet()) { if (!activelyReadingTaskGroups.containsKey(groupId)) { + + // In bounded mode, check if task group has completed before recreating + if (ioConfig.isBounded() && hasTaskGroupReachedBoundedEnd(groupId)) { + log.debug( + "Bounded taskGroup[%d] has reached end offsets, skipping recreation", + groupId + ); + continue; // Skip creating new task group + } + log.info("Creating new taskGroup[%d] for partitions[%s].", groupId, partitionGroups.get(groupId)); final DateTime minimumMessageTime; if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) { @@ -4173,12 +4305,25 @@ private void createNewTasks() throws JsonProcessingException .collect(Collectors.toSet()); } + // NEW: Extract end offsets for bounded mode + ImmutableMap endOffsets = null; + if (ioConfig.isBounded()) { + endOffsets = ImmutableMap.copyOf(getEndOffsetsForGroup(groupId)); + } + + log.info( + "Initializing taskGroup[%d] with startingOffsets[%s] and endOffsets[%s]", + groupId, + simpleStartingOffsets, + endOffsets + ); newTaskGroups.put( groupId, new TaskGroup( groupId, simpleStartingOffsets, simpleUnfilteredStartingOffsets, + endOffsets, minimumMessageTime, maximumMessageTime, exclusiveStartSequenceNumberPartitions @@ -4270,7 +4415,11 @@ private Map> generate Map partitionsToReset ) { + // Existing logic for both streaming and bounded mode ImmutableMap.Builder> builder = ImmutableMap.builder(); + final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); + final BoundedStreamConfig metadataBoundedConfig = getBoundedConfigFromMetadata(dataSourceMetadata); + for (PartitionIdType partitionId : partitionGroups.get(groupId)) { SequenceOffsetType sequence = partitionOffsets.get(partitionId); @@ -4285,6 +4434,7 @@ private Map> generate OrderedSequenceNumber offsetFromStorage = getOffsetFromStorageForPartition( partitionId, metadataOffsets, + metadataBoundedConfig, partitionsToReset ); @@ -4296,6 +4446,18 @@ private Map> generate return builder.build(); } + /** + * Extracts BoundedStreamConfig from DataSourceMetadata if available. + */ + @Nullable + private BoundedStreamConfig getBoundedConfigFromMetadata(@Nullable DataSourceMetadata metadata) + { + if (metadata instanceof SeekableStreamDataSourceMetadata) { + return ((SeekableStreamDataSourceMetadata) metadata).getBoundedStreamConfig(); + } + return null; + } + /** * Queries the dataSource metadata table to see if there is a previous ending sequence for this partition. If it * doesn't find any data, it will retrieve the latest or earliest Kafka/Kinesis sequence depending on the @@ -4309,11 +4471,32 @@ private Map> generate private OrderedSequenceNumber getOffsetFromStorageForPartition( PartitionIdType partition, final Map metadataOffsets, + @Nullable final BoundedStreamConfig metadataBoundedConfig, final Map partitionsToReset ) { SequenceOffsetType sequence = metadataOffsets.get(partition); if (sequence != null) { + // In bounded mode, check if the metadata's bounded config matches the current supervisor's config + if (ioConfig.isBounded()) { + BoundedStreamConfig currentBoundedConfig = ioConfig.getBoundedStreamConfig(); + + // If configs don't match (or metadata has no config), throw exception + if (!currentBoundedConfig.equals(metadataBoundedConfig)) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Bounded supervisor detected existing metadata from a different run. " + + "Metadata bounded config [%s] does not match current config [%s]. " + + "To start a new bounded ingestion, either: " + + "(1) use the supervisor reset API to clear existing metadata, or " + + "(2) use a different supervisor ID.", + metadataBoundedConfig, + currentBoundedConfig + ); + } + } + log.debug("Getting sequence [%s] from metadata storage for partition [%s]", sequence, partition); if (!taskTuningConfig.isSkipSequenceNumberAvailabilityCheck()) { if (!checkOffsetAvailability(partition, sequence)) { @@ -4334,6 +4517,23 @@ private OrderedSequenceNumber getOffsetFromStorageForPartiti } return makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence()); } else { + // NEW: In bounded mode, if no checkpoint exists (task failed before first checkpoint), + // fall back to bounded start offset + if (ioConfig.isBounded()) { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map startOffsets = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + SequenceOffsetType startOffset = startOffsets.get(partition); + if (startOffset != null) { + log.info( + "Bounded mode: no checkpoint found for partition[%s], using configured start offset[%s]", + partition, + startOffset + ); + return makeSequenceNumber(startOffset, false); + } + } + boolean useEarliestSequenceNumber = ioConfig.isUseEarliestSequenceNumber(); if (subsequentlyDiscoveredPartitions.contains(partition)) { log.info( @@ -4377,6 +4577,177 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Check if all partitions in a task group have reached their bounded end offsets. + * Used to determine if the task group completed successfully vs failed midway. + * + * @param groupId The task group ID to check + * @return true if all partitions in the group have reached their end offsets, false otherwise + */ + private boolean hasTaskGroupReachedBoundedEnd(int groupId) + { + Set partitionsInGroup = partitionGroups.get(groupId); + if (partitionsInGroup == null || partitionsInGroup.isEmpty()) { + return false; + } + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map startOffsets = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + Map endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + + // Check if all partitions have empty ranges. + // Reversed ranges (start > end) return false so createNewTasks() runs and the task IO config + // rejects the invalid range with a clear error, making the supervisor UNHEALTHY. + // For exclusive end offsets (Kafka): start == end is a legitimate empty range. + // For inclusive end offsets (Kinesis): start == end is one record, not empty. + boolean allPartitionsEmptyRange = true; + for (PartitionIdType partition : partitionsInGroup) { + SequenceOffsetType start = startOffsets.get(partition); + SequenceOffsetType end = endOffsets.get(partition); + + boolean startBeyondEnd = isOffsetAtOrBeyond(start, end) && !start.equals(end); + if (startBeyondEnd) { + return false; + } + + boolean isEmpty = isEndOffsetExclusive() && start.equals(end); + if (!isEmpty) { + allPartitionsEmptyRange = false; + break; + } + } + + if (allPartitionsEmptyRange) { + log.warn( + "TaskGroup[%d] has empty range for all partitions (start == end). " + + "No work to do, marking as complete. Start: %s, End: %s", + groupId, + startOffsets, + endOffsets + ); + return true; + } + + // Offsets from a prior bounded run with a different config must not count as completion evidence. + BoundedStreamConfig metadataBoundedConfig = getBoundedConfigFromMetadata(retrieveDataSourceMetadata()); + if (!boundedConfig.equals(metadataBoundedConfig)) { + return false; + } + Map currentOffsets = getOffsetsFromMetadataStorage(); + + log.info( + "Bounded mode: checking completion for taskGroup[%d]. Current offsets from metadata: %s, End offsets: %s", + groupId, + currentOffsets, + endOffsets + ); + + if (currentOffsets == null || currentOffsets.isEmpty()) { + log.debug("No checkpointed offsets found, taskGroup[%d] has not completed", groupId); + return false; // No progress yet, task hasn't completed + } + + // Check if ALL partitions in this group have reached their end offsets + for (PartitionIdType partition : partitionsInGroup) { + SequenceOffsetType endOffset = endOffsets.get(partition); + SequenceOffsetType currentOffset = currentOffsets.get(partition); + + if (currentOffset == null) { + log.debug( + "Partition[%s] in taskGroup[%d] has no checkpointed offset, not complete", + partition, + groupId + ); + return false; // Partition hasn't started processing + } + + if (!isOffsetAtOrBeyond(currentOffset, endOffset)) { + log.debug( + "Partition[%s] in taskGroup[%d] at offset[%s], has not reached end[%s]", + partition, + groupId, + currentOffset, + endOffset + ); + return false; // This partition hasn't reached its end + } + } + + log.info( + "All partitions in taskGroup[%d] have reached their end offsets", + groupId + ); + return true; // All partitions have reached their end offsets + } + + /** + * Get end offsets for all partitions in a task group from bounded config. + */ + private Map getEndOffsetsForGroup(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + Set partitionsInGroup = partitionGroups.get(groupId); + + if (partitionsInGroup == null) { + return Collections.emptyMap(); + } + + return partitionsInGroup.stream() + .filter(endOffsets::containsKey) + .collect(Collectors.toMap( + p -> p, + endOffsets::get + )); + } + + /** + * For bounded supervisors, we determine completion by checking if new tasks would be created. + * In {@link #createNewTasks()}, bounded mode checks {@link #hasTaskGroupReachedBoundedEnd(int)} before creating tasks. + * If that returns true (offsets reached), no new tasks are created. + * So completion is: no active tasks, no pending tasks, and {@link #createNewTasks()} chose not to create any. + * + * @return true if all bounded work is complete, false otherwise + */ + private boolean isBoundedWorkComplete() + { + if (!ioConfig.isBounded()) { + return false; + } + + // Check if task groups are empty (no tasks active or pending) + boolean noActiveTasks = activelyReadingTaskGroups.isEmpty(); + boolean noPendingTasks = pendingCompletionTaskGroups.values().stream().allMatch(List::isEmpty); + + if (!noActiveTasks || !noPendingTasks) { + return false; + } + // If hasTaskGroupReachedBoundedEnd() returns false for any group, createNewTasks() + // will create tasks next iteration, so we're not complete. + for (Integer groupId : partitionGroups.keySet()) { + if (!hasTaskGroupReachedBoundedEnd(groupId)) { + log.debug("TaskGroup[%d] has not reached bounded end, tasks will be recreated", groupId); + return false; + } + } + + log.info("All bounded tasks completed for supervisor[%s]", supervisorId); + return true; + } + + /** + * Handle bounded processing completion by shutting down the supervisor. + * At this point, all task groups are already empty (verified by isBoundedWorkComplete), + * so we just need to mark the supervisor as completed. + */ + private void handleBoundedCompletion() + { + log.info("Bounded processing complete for supervisor[%s]. Marking as COMPLETED.", supervisorId); + stateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + } + protected DataSourceMetadata retrieveDataSourceMetadata() { return indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(supervisorId); @@ -4412,9 +4783,18 @@ private void createTasksForGroup(int groupId, int replicas) TaskGroup group = activelyReadingTaskGroups.get(groupId); Map startPartitions = group.startingSequences; Map endPartitions = new HashMap<>(); - for (PartitionIdType partition : startPartitions.keySet()) { - endPartitions.put(partition, getEndOfPartitionMarker()); + + if (group.endSequences != null && !group.endSequences.isEmpty()) { + // Bounded mode: use explicit end offsets from task group + endPartitions.putAll(group.endSequences); + log.info("Creating bounded tasks for taskGroup[%d] with endOffsets: %s", groupId, group.endSequences); + } else { + // Streaming mode: use exclusive end (effectively no end) + for (PartitionIdType partition : startPartitions.keySet()) { + endPartitions.put(partition, getEndOfPartitionMarker()); + } } + Set exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups .get(groupId) .exclusiveStartSequenceNumberPartitions; @@ -4763,6 +5143,25 @@ protected abstract List serverPrioritiesToAssign ) throws JsonProcessingException; + /** + * Converts a string representation of a partition ID to the typed partition ID. + * Used for deserializing bounded stream config where partition keys come as strings. + * + * @param partitionIdString string representation of partition ID + * @return typed partition ID + */ + protected abstract PartitionIdType createPartitionIdFromString(String partitionIdString); + + /** + * Converts an object (typically Number) to the typed sequence offset. + * Used for deserializing bounded stream config where offset values may come as Integer, Long, String, etc. + * Jackson may deserialize numeric values as Integer if they fit, but implementations like Kafka need Long. + * + * @param offsetObj the offset object from deserialization + * @return typed sequence offset + */ + protected abstract SequenceOffsetType createSequenceOffsetFromObject(Object offsetObj); + /** * calculates the taskgroup id that the given partition belongs to. * different between Kafka/Kinesis since Kinesis uses String as partition id @@ -5120,10 +5519,26 @@ protected LagStats aggregatePartitionLags(Map partitionLa */ protected abstract boolean isShardExpirationMarker(SequenceOffsetType seqNum); + /** + * Compares if current offset has reached or exceeded the target offset. + * Used to determine if a bounded task group has completed successfully. + * + * @param current Current offset from metadata storage + * @param target Target end offset from bounded config + * @return true if current >= target + */ + protected abstract boolean isOffsetAtOrBeyond(SequenceOffsetType current, SequenceOffsetType target); + /** * Returns true if the start sequence number should be exclusive for the non-first sequences for the whole partition. * For example, in Kinesis, the start offsets are inclusive for the first sequence, but exclusive for following * sequences. In Kafka, start offsets are always inclusive. */ protected abstract boolean useExclusiveStartSequenceNumberForNonFirstSequence(); + + /** + * Returns true if end offsets are exclusive (like Kafka), false if inclusive (like Kinesis). + * Used for bounded mode to correctly detect empty ranges: start == end is empty only if exclusive. + */ + protected abstract boolean isEndOffsetExclusive(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index cf7b27a6f0c4..3c7460d6d597 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -63,6 +63,7 @@ public abstract class SeekableStreamSupervisorIOConfig @Nullable private final IdleConfig idleConfig; @Nullable private final Integer stopTaskCount; @Nullable private final Map serverPriorityToReplicas; + @Nullable private final BoundedStreamConfig boundedStreamConfig; private final LagAggregator lagAggregator; private final boolean autoScalerEnabled; @@ -84,7 +85,8 @@ public SeekableStreamSupervisorIOConfig( DateTime lateMessageRejectionStartDateTime, @Nullable IdleConfig idleConfig, @Nullable Integer stopTaskCount, - @Nullable Map serverPriorityToReplicas + @Nullable Map serverPriorityToReplicas, + @Nullable BoundedStreamConfig boundedStreamConfig ) { this.stream = Preconditions.checkNotNull(stream, "stream cannot be null"); @@ -140,6 +142,7 @@ public SeekableStreamSupervisorIOConfig( this.idleConfig = idleConfig; this.serverPriorityToReplicas = serverPriorityToReplicas; + this.boundedStreamConfig = boundedStreamConfig; if (this.serverPriorityToReplicas != null) { int serverPriorityReplicas = 0; @@ -301,4 +304,16 @@ public int getMaxAllowedStops() } return stopTaskCount == null ? taskCount : stopTaskCount; } + + @Nullable + @JsonProperty + public BoundedStreamConfig getBoundedStreamConfig() + { + return boundedStreamConfig; + } + + public boolean isBounded() + { + return boundedStreamConfig != null; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index fefa24a5f3fb..842f0de4774e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -48,6 +48,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec { + protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor." + "%nTo perform the update safely, follow these steps:" diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java index 5af21ca7b6aa..6fd6307e50c7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java @@ -333,6 +333,7 @@ private TestableSeekableStreamSupervisorIOConfig( lateMessageRejectionStartDateTime, idleConfig, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java index 1fa7993b69a6..4f4fe8afc01e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java @@ -38,7 +38,8 @@ public TestSeekableStreamIndexTaskIOConfig() DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc(), new CsvInputFormat(null, null, true, null, 0, null), - Duration.standardHours(2).getStandardMinutes() + Duration.standardHours(2).getStandardMinutes(), + null ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java new file mode 100644 index 000000000000..a29d63add00b --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.error.DruidException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class BoundedStreamConfigTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(BoundedStreamConfig.class) + .withNonnullFields("startSequenceNumbers", "endSequenceNumbers") + .usingGetClass() + .verify(); + } + + @Test + public void testConstructorWithNullStartSequenceNumbers() + { + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(null, Map.of("0", 500L)) + ); + Assert.assertTrue(ex.getMessage().contains("cannot be null or empty")); + } + + @Test + public void testConstructorWithNullEndSequenceNumbers() + { + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(Map.of("0", 100L), null) + ); + Assert.assertTrue(ex.getMessage().contains("cannot be null or empty")); + } + + @Test + public void testConstructorWithEmptyStartSequenceNumbers() + { + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(Map.of(), Map.of("0", 500L)) + ); + Assert.assertTrue(ex.getMessage().contains("cannot be null or empty")); + } + + @Test + public void testConstructorWithEmptyEndSequenceNumbers() + { + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(Map.of("0", 100L), Map.of()) + ); + Assert.assertTrue(ex.getMessage().contains("cannot be null or empty")); + } + + @Test + public void testConstructorWithMismatchedPartitions() + { + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(Map.of("0", 100L), Map.of("1", 500L)) + ); + Assert.assertTrue(ex.getMessage().contains("must have matching partition sets")); + } + + @Test + public void testSerializationDeserialization() throws Exception + { + BoundedStreamConfig config = new BoundedStreamConfig( + Map.of("0", 100, "1", 200), + Map.of("0", 500, "1", 600) + ); + + BoundedStreamConfig deserialized = mapper.readValue( + mapper.writeValueAsString(config), + BoundedStreamConfig.class + ); + + Assert.assertEquals(2, deserialized.getStartSequenceNumbers().size()); + Assert.assertEquals(2, deserialized.getEndSequenceNumbers().size()); + Assert.assertEquals(100, deserialized.getStartSequenceNumbers().get("0")); + Assert.assertEquals(200, deserialized.getStartSequenceNumbers().get("1")); + Assert.assertEquals(500, deserialized.getEndSequenceNumbers().get("0")); + Assert.assertEquals(600, deserialized.getEndSequenceNumbers().get("1")); + } + + @Test + public void testDeserializationWithStringValues() throws Exception + { + String json = "{" + + "\"startSequenceNumbers\": {\"0\": \"100\", \"1\": \"200\"}," + + "\"endSequenceNumbers\": {\"0\": \"500\", \"1\": \"600\"}" + + "}"; + + BoundedStreamConfig config = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertEquals(2, config.getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getEndSequenceNumbers().size()); + } + + @Test + public void testDeserializationWithMixedTypes() throws Exception + { + String json = "{" + + "\"startSequenceNumbers\": {\"0\": 100, \"1\": \"200\"}," + + "\"endSequenceNumbers\": {\"0\": 500, \"1\": \"600\"}" + + "}"; + + BoundedStreamConfig config = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertEquals(2, config.getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getEndSequenceNumbers().size()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java index 09f8bfde4841..ec60d5753f74 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java @@ -69,6 +69,7 @@ public void testAllDefaults() null, null, null, + null, null ) { @@ -145,6 +146,7 @@ private static void assertTaskCount( null, null, null, + null, null ) { @@ -177,6 +179,7 @@ public void testBothLateMessageRejectionPeriodAndStartDateTime() DateTimes.nowUtc(), null, null, + null, null ) { @@ -212,6 +215,7 @@ public void testNullAggregatorThrows() null, null, null, + null, null ) { @@ -245,6 +249,7 @@ public void testGetMaxAllowedStopsScalingDisabled() null, null, null, + null, null ) { @@ -269,6 +274,7 @@ public void testGetMaxAllowedStopsScalingDisabled() null, null, 3, + null, null ) { @@ -305,6 +311,7 @@ public void testGetMaxAllowedStopsScalingEnabled() null, null, 1, + null, null ) { @@ -338,6 +345,7 @@ public void testGetMaxAllowedStopsScalingEnabled() null, null, 1, + null, null ) { @@ -368,6 +376,7 @@ public void testGetMaxAllowedStopsScalingEnabled() null, null, null, + null, null ) { @@ -455,9 +464,111 @@ private SeekableStreamSupervisorIOConfig makeSeekableStreamSupervisorIOConfig(@N null, null, null, - serverPriorityToReplicas + serverPriorityToReplicas, + null ) { }; } + + @Test + public void testBoundedModeWithValidConfig() + { + Map startOffsets = Map.of("0", 100, "1", 200); + Map endOffsets = Map.of("0", 500, "1", 600); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + LagAggregator lagAggregator = mock(LagAggregator.class); + + SeekableStreamSupervisorIOConfig config = new SeekableStreamSupervisorIOConfig( + "stream", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + lagAggregator, + null, + null, + null, + null, + boundedConfig + ) + { + }; + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(boundedConfig, config.getBoundedStreamConfig()); + } + + @Test + public void testUnboundedModeByDefault() + { + LagAggregator lagAggregator = mock(LagAggregator.class); + + SeekableStreamSupervisorIOConfig config = new SeekableStreamSupervisorIOConfig( + "stream", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + lagAggregator, + null, + null, + null, + null, + null + ) + { + }; + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } + + @Test + public void testBoundedModeWithNullConfig() + { + LagAggregator lagAggregator = mock(LagAggregator.class); + + SeekableStreamSupervisorIOConfig config = new SeekableStreamSupervisorIOConfig( + "stream", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + lagAggregator, + null, + null, + null, + null, + null + ) + { + }; + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index 8d1f755cec2d..3d0c6426feb2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -39,6 +39,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.MetadataSupervisorManager; @@ -787,6 +788,7 @@ public void testSeekableStreamSupervisorSpecWithScaleDisable() throws Interrupte null, null, null, + null, null ) { @@ -845,6 +847,7 @@ public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled() null, new IdleConfig(true, null), null, + null, null ) { @@ -1469,6 +1472,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal null, null, null, + null, null ) { @@ -1491,6 +1495,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal null, null, null, + null, null ) { @@ -1543,4 +1548,74 @@ private static Map getScaleInProperties() return autoScalerConfig; } + @Test + public void testBoundedStreamSupervisorSpec_runsWithBoundedConfig() + { + EmittingLogger.registerEmitter(emitter); + + Map startOffsets = ImmutableMap.of("0", 0L); + Map endOffsets = ImmutableMap.of("0", 100L); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + SeekableStreamSupervisorIOConfig boundedIoConfig = new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), + 1, + 1, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + LagAggregator.DEFAULT, + null, + null, + null, + null, + boundedConfig + ) + { + }; + + EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(boundedIoConfig).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(boundedIoConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(EasyMock.anyString())).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.replay(taskStorage); + + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(EasyMock.anyString())).andReturn(null).anyTimes(); + EasyMock.replay(indexerMetadataStorageCoordinator); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(1); + + supervisor.start(); + supervisor.runInternal(); + + // Verify bounded config is properly set + Assert.assertTrue(supervisor.getIoConfig().isBounded()); + Assert.assertNotNull(supervisor.getIoConfig().getBoundedStreamConfig()); + Assert.assertEquals(startOffsets, supervisor.getIoConfig().getBoundedStreamConfig().getStartSequenceNumbers()); + Assert.assertEquals(endOffsets, supervisor.getIoConfig().getBoundedStreamConfig().getEndSequenceNumbers()); + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index b0b7954bb125..eff5d1acd980 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -702,6 +702,7 @@ public void testIdleStateTransition() throws Exception null, new IdleConfig(true, 200L), null, + null, null ) { @@ -810,6 +811,7 @@ public void testIdleOnStartUpAndTurnsToRunningAfterLagUpdates() null, new IdleConfig(true, 200L), null, + null, null ) { @@ -1110,6 +1112,7 @@ public void testCheckpointForActiveTaskGroup() throws InterruptedException, Json null, new IdleConfig(true, 200L), null, + null, null ) {}; @@ -1329,6 +1332,7 @@ public void testEarlyStoppingOfTaskGroupBasedOnStopTaskCount() throws Interrupte null, new IdleConfig(true, 200L), stopTaskCount, + null, null ) { @@ -1565,6 +1569,7 @@ public void testSupervisorStopTaskGroupEarly() throws JsonProcessingException, I null, new IdleConfig(true, 200L), null, + null, null ) { @@ -2614,6 +2619,30 @@ public LagStats computeLagStats() { return new LagStats(0, 0, 0); } + + @Override + protected boolean isEndOffsetExclusive() + { + return true; + } + + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + return Long.parseLong(current) >= Long.parseLong(target); + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + return partitionIdString; + } + + @Override + protected String createSequenceOffsetFromObject(Object offsetObj) + { + return offsetObj.toString(); + } }; supervisor.scheduleReporting(executorService); EasyMock.verify(executorService, spec); @@ -2722,6 +2751,7 @@ private void expectEmitterSupervisor(boolean suspended) null, null, null, + null, null ) { @@ -2788,6 +2818,7 @@ public void testMaxAllowedStopsWithStopTaskCountRatio() null, null, 1, // ensure this is overridden + null, null ) { @@ -3106,7 +3137,8 @@ private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig( null, null, null, - serverPriorityToReplicas + serverPriorityToReplicas, + null ) { }; @@ -3292,7 +3324,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - ioConfig.getTaskDuration().getStandardMinutes() + ioConfig.getTaskDuration().getStandardMinutes(), + null ) { }; @@ -3445,6 +3478,30 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { return false; } + + @Override + protected boolean isEndOffsetExclusive() + { + return true; + } + + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + return Long.parseLong(current) >= Long.parseLong(target); + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + return partitionIdString; + } + + @Override + protected String createSequenceOffsetFromObject(Object offsetObj) + { + return offsetObj.toString(); + } } private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor @@ -3585,6 +3642,12 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec) this::emitNoticesQueueSize ); } + + @Override + protected boolean isEndOffsetExclusive() + { + return true; + } } private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem @@ -3673,7 +3736,8 @@ private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - ioConfig.getTaskDuration().getStandardMinutes() + ioConfig.getTaskDuration().getStandardMinutes(), + null ) { }; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java index 82abda059a17..4eefaed9bd99 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java @@ -164,7 +164,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - ioConfig.getTaskDuration().getStandardMinutes() + ioConfig.getTaskDuration().getStandardMinutes(), + null ) { }; @@ -301,6 +302,12 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { return false; } + + @Override + protected boolean isEndOffsetExclusive() + { + return true; + } } class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor @@ -329,6 +336,24 @@ public int getPartitionCount() { return partitionNumbers; } + + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + return Long.parseLong(current) >= Long.parseLong(target); + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + return partitionIdString; + } + + @Override + protected String createSequenceOffsetFromObject(Object offsetObj) + { + return offsetObj.toString(); + } } class StateOverrideTestSeekableStreamSupervisor extends TestSeekableStreamSupervisor @@ -541,6 +566,7 @@ public static SeekableStreamSupervisorIOConfig createIOConfig( null, null, null, + null, null ) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index f64f390fc566..16b02e05f1fb 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -64,7 +64,8 @@ public enum BasicState implements State RUNNING(true, false), IDLE(true, false), SUSPENDED(true, false), - STOPPING(true, false); + STOPPING(true, false), + COMPLETED(true, false); private final boolean healthy; private final boolean firstRunOnly; @@ -199,11 +200,11 @@ public void markRunFinished() consecutiveSuccessfulRuns = currentRunSuccessful ? consecutiveSuccessfulRuns + 1 : 0; consecutiveFailedRuns = currentRunSuccessful ? 0 : consecutiveFailedRuns + 1; - // If the supervisor is not IDLE, try to set the state to RUNNING or SUSPENDED. + // If the supervisor is not IDLE or COMPLETED, try to set the state to RUNNING or SUSPENDED. // This will be rejected if we haven't had atLeastOneSuccessfulRun // (in favor of the more specific states for the initial run) and will instead trigger setting the state to an // unhealthy one if we are now over the error thresholds. - if (!isIdle()) { + if (!isIdle() && !isCompleted()) { maybeSetState(healthySteadyState); } // reset for next run @@ -240,6 +241,11 @@ public boolean isIdle() return SupervisorStateManager.BasicState.IDLE.equals(supervisorState); } + public boolean isCompleted() + { + return SupervisorStateManager.BasicState.COMPLETED.equals(supervisorState); + } + protected Deque getRecentEventsQueue() { return recentEventsQueue; diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java index 367577a291dc..3f8a38054615 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java @@ -66,4 +66,76 @@ public void testIdleConfigSerde() Assert.assertTrue(stateManagerConfig.isIdleConfigEnabled()); Assert.assertEquals(60000, stateManagerConfig.getInactiveAfterMillis()); } + + @Test + public void testStoppingStateIsTerminal() + { + stateManagerConfig = new SupervisorStateManagerConfig(); + SupervisorStateManager supervisorStateManager = new SupervisorStateManager( + stateManagerConfig, + false + ); + + // Start in PENDING state + Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, supervisorStateManager.getSupervisorState()); + + // Transition to STOPPING + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.STOPPING); + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, supervisorStateManager.getSupervisorState()); + + // Attempt to transition out of STOPPING should be ignored + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.RUNNING); + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, supervisorStateManager.getSupervisorState()); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, supervisorStateManager.getSupervisorState()); + + // Cannot transition to COMPLETED from STOPPING + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, supervisorStateManager.getSupervisorState()); + } + + @Test + public void testCompletedStateIsHealthy() + { + stateManagerConfig = new SupervisorStateManagerConfig(); + SupervisorStateManager supervisorStateManager = new SupervisorStateManager( + stateManagerConfig, + false + ); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + + Assert.assertTrue(supervisorStateManager.isHealthy()); + Assert.assertEquals(SupervisorStateManager.BasicState.COMPLETED, supervisorStateManager.getSupervisorState()); + } + + @Test + public void testCompletedStateIsNotFirstRunOnly() + { + stateManagerConfig = new SupervisorStateManagerConfig(); + SupervisorStateManager supervisorStateManager = new SupervisorStateManager( + stateManagerConfig, + false + ); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + + Assert.assertFalse(SupervisorStateManager.BasicState.COMPLETED.isFirstRunOnly()); + } + + @Test + public void testMarkRunFinished_completedStateSkipsHealthyCheck() + { + stateManagerConfig = new SupervisorStateManagerConfig(); + SupervisorStateManager supervisorStateManager = new SupervisorStateManager( + stateManagerConfig, + false + ); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + supervisorStateManager.markRunFinished(); + + Assert.assertEquals(SupervisorStateManager.BasicState.COMPLETED, supervisorStateManager.getSupervisorState()); + } } diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java index 8851c16a4522..f7f6eb7f60ab 100644 --- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java @@ -478,7 +478,8 @@ public TestSeekableStreamIndexTaskIOConfig() DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc(), new CsvInputFormat(null, null, true, null, 0, null), - Duration.standardHours(2).getStandardMinutes() + Duration.standardHours(2).getStandardMinutes(), + null ); } } diff --git a/website/.spelling b/website/.spelling index 28701817f362..877653a3471d 100644 --- a/website/.spelling +++ b/website/.spelling @@ -291,6 +291,7 @@ averagers backend backfills backfilled +backfilling backpressure base64 baseTime @@ -303,6 +304,7 @@ boolean breakpoint broadcasted bucketSize +checkpointed checksums classpath clickstream @@ -508,6 +510,7 @@ prepends prepopulated preprocessing priori +reprocessing procs processFromRaw programmatically