Skip to content

feat: Bounded Stream Supervisor#19372

Open
aho135 wants to merge 54 commits into
apache:masterfrom
aho135:bounded-stream-supervisor
Open

feat: Bounded Stream Supervisor#19372
aho135 wants to merge 54 commits into
apache:masterfrom
aho135:bounded-stream-supervisor

Conversation

@aho135
Copy link
Copy Markdown
Contributor

@aho135 aho135 commented Apr 24, 2026

Description

Introduces a new property to the Stream Supervisor spec IOConfig called boundedStreamConfig which allows operators to specify start and end offset ranges for short-lived supervised ingestion. This property modifies the main Supervisor run loop to only ingest from and monitor partitions specified in the boundedStreamConfig. After the offset range has been consumed the Supervisor will transition into a terminal state (COMPLETED). The motivation for this PR came out of #19191 which submits backfill tasks that are unsupervised. Once this change is merged, 19191 can be enhanced to use the boundedStreamConfig so that the backfill tasks are supervised.

Release note

Adds a property called boundedStreamConfig to the SeekableStreamSupervisorIOConfig which allows operators to spin up a Supervisor that consumes only a specified offset range.


Key changed/added classes in this PR
  • SeekableStreamSupervisorIOConfig
  • BoundedStreamConfig
  • SeekableStreamSupervisor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@aho135 aho135 changed the title Bounded stream supervisor feat: Bounded stream supervisor Apr 24, 2026
@aho135 aho135 changed the title feat: Bounded stream supervisor feat: Bounded Stream Supervisor Apr 24, 2026
@aho135 aho135 requested a review from abhishekrb19 April 24, 2026 17:48
exclusiveStartSequenceNumberPartitions,
generateSequenceName(
unfilteredStartingSequencesForSequenceName == null
? startingSequences
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aho135 - took a quick glance and the approach looks good to me overall!
I’m still going through some of the main files and just checkpointing my review. Do you think it would be possible to add a simple embedded test with the new config for some end-to-end coverage?

this.startSequenceNumbers = Preconditions.checkNotNull(startSequenceNumbers, "startSequenceNumbers");
this.endSequenceNumbers = Preconditions.checkNotNull(endSequenceNumbers, "endSequenceNumbers");

// Validation
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a guard rail, I think it'll be good to have stricter checks for each partition so there's no unintended behavior with incorrect ranges specified:
startSequenceNumbers < endSequenceNumbers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation is a bit difficult to do within BoundedStreamConfig because of the generic typing. But I do see there is already validation for this in Task IOConfig. kafka

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation there isn't strict enough though because startOffset can equal endOffset. In that scenario the Supervisor spins up a task that consumes nothing and then shuts down. But since no data was consumed there is no metadata update so it gets stuck in a loop where it keeps spinning up tasks. I added additional validation to handle this scenario in this commit

@@ -4255,6 +4418,23 @@ private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartiti
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Bounded starts can be ignored when metadata exists

getOffsetFromStorageForPartition only falls back to boundedStreamConfig.startSequenceNumbers when no metadata/checkpoint offset exists. If a supervisor is reset or reconfigured with a requested bounded start while metadata storage still has an older offset, the stored metadata wins and the task starts from the stale position instead of the user-supplied bounded start. That can skip the requested backfill range or process a different interval than configured. Bounded mode should either clear/namespace old metadata for the run or explicitly prefer the configured start when initializing the bounded task group.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @FrankChen021!

Bounded mode should either clear/namespace old metadata for the run

I'm a bit wary of automatic cleanup of metadata. I'm thinking through the scenario where a cluster operator has a running Supervisor. They want to re-ingest some older data so they resubmit the exact same spec (forgetting to update id) so the Bounded Supervisor succeeds but the previously committed offset gets lost.

explicitly prefer the configured start when initializing the bounded task group.

This falls into the same issue as above where if the operator forgets to set the id to something different than the running Supervisor then the previous committed offset is lost forever

I'm leaning towards adding validation that if metadata already exists for the id then just throw an exception and suggest the operator to resubmit with a different id or reset the Supervisor. Curious to hear your thoughts on this approach. Thanks again

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That validation approach sounds right to me. The important part is preventing a bounded supervisor from accidentally mixing explicit bounded start offsets with existing committed metadata for the same id; failing fast with guidance to use a different id or reset the supervisor would avoid the silent stale-offset behavior without deleting or overwriting a running supervisor's offsets.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankChen021 Thinking through this one a bit more, the validation approach is a bit tricky. It's not straightforward to tell if the existing metadata is from the bounded Supervisor itself (in which case starting from the metadata would be the correct behavior) or if it's from a previous Supervisor.

One approach we can take is that if the metadata offset falls within the configured start/end offsets then use that, otherwise fall back to startOffset. This does run the risk of partial ingestion of the specified range though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the source of the metadata is the hard part here, but I would avoid using "metadata is within the configured start/end range" as the deciding rule. If the stored offset is inside [start, end), starting there can still silently skip the prefix [start, storedOffset), which is the same class of surprise as the original issue. I think the safer behavior is still to fail fast when bounded mode finds existing metadata for the configured bounded partitions unless there is an explicit signal that this is a resume of the same bounded run.

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankChen021! I took a stab at this in the most recent commit
Please let me know your thoughts when you get the chance

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this looks like the right direction to me. Persisting the bounded config in the datasource metadata and rejecting existing metadata whose bounded config is missing or different addresses the silent stale-offset case, while still allowing a supervisor to resume metadata from the same bounded run.

Comment on lines +4643 to +4651
/**
* Handle bounded processing completion by shutting down the supervisor.
* At this point, all task groups are already empty (verified by isBoundedWorkComplete),
* so we just need to mark the supervisor as completed.
*/
private void handleBoundedCompletion()
{
log.info("Bounded processing complete for supervisor[%s]. Marking as COMPLETED.", supervisorId);
stateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call stop() with this COMPLETED state so things get unregistered and the executor is removed?

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One workflow I was testing out was to submit a bounded Supervisor and have it run to completion. Then I adjusted the start/end offsets and re-submitted the spec. Then I did a hard reset to clear the metadata so it could ingest the new offset range. For this kind of workflow we would need the executor to continue running even after the initial completion

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest commit handles this workflow

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1

This is an automated review by Codex GPT-5

for (PartitionIdType partition : partitionsInGroup) {
SequenceOffsetType start = startOffsets.get(partition);
SequenceOffsetType end = endOffsets.get(partition);
if (!isOffsetAtOrBeyond(start, end)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Kinesis bounded ranges with start == end are skipped

The new empty-range check treats start >= end as completed for all bounded supervisors before creating any task. That is valid for Kafka's exclusive end offsets, but Kinesis declares bounded end offsets as inclusive and its task runner returns isEndOffsetExclusive() == false, so a Kinesis bounded ingestion for a single record where startSequenceNumbers equals endSequenceNumbers is marked COMPLETED without reading that record. This should be provider-aware, for example only treating equality as empty when the end offset is exclusive, while still rejecting/handling start > end appropriately.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.


This is an automated review by Codex GPT-5

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1

This is an automated review by Codex GPT-5

"All partitions in taskGroup[%d] have reached their end offsets",
groupId
);
return true; // All partitions have reached their end offsets
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Validate bounded metadata before completing from old offsets

A new bounded supervisor can silently complete using offsets from a previous bounded run with a different boundedStreamConfig. The mismatch check only runs in getOffsetFromStorageForPartition(), but isBoundedWorkComplete() calls hasTaskGroupReachedBoundedEnd() first, which reads raw metadata offsets and returns true when currentOffset >= the new end offset. For example, after a run [0,100] commits offset 100, posting [50,75] for the same datasource reaches COMPLETED without running tasks or throwing the documented metadata mismatch error. Validate the stored bounded config before treating metadata offsets as completion, or avoid using prior offsets when the bounded config differs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankChen021 I did leave that as a comment in the embedded test

I suppose it would be better to be explicit about this mismatch here. Will make that update

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the current update addresses my concern: hasTaskGroupReachedBoundedEnd() now refuses to treat stored offsets as completion evidence when the metadata bounded config differs, so task creation can reach the existing mismatch error path. The added stale-offset regression test covers the case I was worried about.

Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aho135 for addressing reviews. Some additional comments, but I think it's pretty close

Comment on lines +261 to +265
cluster.callApi().postSupervisor(supervisor2);
waitForSupervisorToBeUnhealthy(supervisor2.getId());

final SupervisorStatus status2 = cluster.callApi().getSupervisorStatus(supervisor2.getId());
Assertions.assertFalse(status2.isHealthy(), "Supervisor should be unhealthy after detecting metadata mismatch");
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think validating a third supervisor with a different ID but the same bounds would be worth adding that succeeds? (either in this test or the following one)

Comment thread docs/ingestion/supervisor.md Outdated
Comment thread docs/ingestion/supervisor.md Outdated
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3

Reviewed 47 of 47 changed files.

Findings that could not be attached inline:

  • extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadata.java:120 - [P2] Check bounded config before multi-topic metadata fallback. Adding boundedStreamConfig to metadata equality makes line 121 reject different bounded configs initially, but the multi-topic fallback then compares only sequence numbers and can return true for metadata from a different bounded run. In multi-topic bounded ingestion, that lets transactional metadata matching ignore the bounded config mismatch and can allow stale tasks to publish or overwrite metadata. Add an explicit boundedStreamConfig equality check before entering the fallback.

This is an automated review by Codex GPT-5.5

boolean isEmpty;
if (isEndOffsetExclusive()) {
// Exclusive: empty if start >= end
isEmpty = isOffsetAtOrBeyond(start, end);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Reject reversed bounded ranges instead of completing

For exclusive streams, this treats start > end the same as the valid empty case start == end, so a config like start=500/end=100 makes every partition look complete and the supervisor can move to COMPLETED without creating a task. KafkaIndexTaskIOConfig would reject end < start if a task were created, but this completion check bypasses that validation. Reversed ranges should fail validation instead of being reported as successful bounded ingestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants