Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.linkedin.datastream.server;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -4505,4 +4506,142 @@ public TestSetup(EmbeddedDatastreamCluster datastreamKafkaCluster, Coordinator c
_connector = connector;
}
}

/**
* Sets the private static final ASSIGNMENT_TIMEOUT field on Coordinator via reflection.
* Returns the original value so it can be restored.
*/
private Duration setAssignmentTimeout(Duration newTimeout) throws Exception {
Field field = Coordinator.class.getDeclaredField("ASSIGNMENT_TIMEOUT");
field.setAccessible(true);

// Remove the 'final' modifier so we can write to the field
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~java.lang.reflect.Modifier.FINAL);

Duration original = (Duration) field.get(null);
field.set(null, newTimeout);
return original;
}

@Test
public void testHandleAssignmentChangeClearsStateOnTimeout() throws Exception {
String testCluster = "testHandleAssignmentChangeClearsStateOnTimeout";
String testConnectorType = "testConnectorType";
String datastreamName1 = "datastream1";

// Reduce ASSIGNMENT_TIMEOUT so the test doesn't wait 90 seconds
Duration originalTimeout = setAssignmentTimeout(Duration.ofSeconds(1));

try {
Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);

// A connector that blocks on onAssignmentChange long enough to trigger the timeout
CountDownLatch blockingLatch = new CountDownLatch(1);
TestHookConnector slowConnector = new TestHookConnector("slowConnector", testConnectorType) {
@Override
public void onAssignmentChange(List<DatastreamTask> tasks) {
try {
// Block for longer than ASSIGNMENT_TIMEOUT (1 second)
blockingLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
super.onAssignmentChange(tasks);
}
};

instance1.addConnector(testConnectorType, slowConnector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
instance1.start();

ZkClient zkClient = new ZkClient(_zkConnectionString);
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName1);

// Wait for the timeout to occur and state to be cleared
Assert.assertTrue(PollUtils.poll(() -> instance1.getDatastreamTasks().isEmpty(), 200, 10000),
"Expected _assignedDatastreamTasks to be cleared after assignment timeout");

// Unblock the connector so it can process retried assignments
blockingLatch.countDown();

// After unblocking, the retried assignment should succeed
assertConnectorAssignment(slowConnector, 10000, datastreamName1);

// Verify the coordinator has the task tracked after successful retry
Assert.assertFalse(instance1.getDatastreamTasks().isEmpty(),
"Expected _assignedDatastreamTasks to be populated after successful retry");

instance1.stop();
instance1.getDatastreamCache().getZkclient().close();
zkClient.close();
} finally {
setAssignmentTimeout(originalTimeout);
}
}

/**
* Verifies that after a timeout clears _assignedDatastreamTasks, a re-assignment
* does NOT skip the task (i.e., the task is not incorrectly treated as "already running").
* This is the core bug that the clear() fix addresses.
*/
@Test
public void testHandleAssignmentChangeTimeoutDoesNotSkipTaskOnReassignment() throws Exception {
String testCluster = "testHandleAssignmentChangeTimeoutDoesNotSkipTaskOnReassignment";
String testConnectorType = "testConnectorType";
String datastreamName1 = "datastream1";

// Reduce ASSIGNMENT_TIMEOUT so the test doesn't wait 90 seconds
Duration originalTimeout = setAssignmentTimeout(Duration.ofSeconds(1));

try {
java.util.concurrent.atomic.AtomicInteger assignmentChangeCount = new java.util.concurrent.atomic.AtomicInteger(0);
// Block only the first onAssignmentChange; let subsequent ones through immediately
CountDownLatch blockFirstAssignment = new CountDownLatch(1);

Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);

TestHookConnector connector = new TestHookConnector("connector1", testConnectorType) {
@Override
public void onAssignmentChange(List<DatastreamTask> tasks) {
int count = assignmentChangeCount.incrementAndGet();
if (count == 1) {
// Block the first assignment to trigger timeout. Use a long wait so the
// coordinator's 1-second timeout fires while we're still blocked.
try {
blockFirstAssignment.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
super.onAssignmentChange(tasks);
}
};

instance1.addConnector(testConnectorType, connector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
instance1.start();

ZkClient zkClient = new ZkClient(_zkConnectionString);
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName1);

// Unblock the first (timed-out) assignment after giving coordinator time to timeout and retry
Thread.sleep(3000);
blockFirstAssignment.countDown();

// The retry should successfully assign the task even though the first attempt timed out
assertConnectorAssignment(connector, 15000, datastreamName1);

// Verify the coordinator has the task tracked properly after successful retry
Assert.assertFalse(instance1.getDatastreamTasks().isEmpty(),
"Expected _assignedDatastreamTasks to be populated after successful retry");

instance1.stop();
instance1.getDatastreamCache().getZkclient().close();
zkClient.close();
} finally {
setAssignmentTimeout(originalTimeout);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,15 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
try {
getAssignmentsFuture(assignmentChangeFutures, start, isDatastreamUpdate);
} catch (TimeoutException e) {
// if it's timeout then we will retry
_log.warn("Timeout when doing the assignment", e);
// Clear the current assignment state to force full reconciliation on the next assignment change.
// Without this, _assignedDatastreamTasks retains stale state from the previous successful assignment,
// causing the next handleAssignmentChange() to compute an incorrect diff. For example, a task that was
// unassigned and stopped by the connector but not removed from _assignedDatastreamTasks (due to this
// timeout) would be incorrectly treated as "already running" if re-assigned, resulting in no
// ConnectorTask being created. Clearing follows the same pattern as onSessionExpired().
_log.warn("Timeout when doing the assignment. Clearing current assignment state to force full "
+ "reconciliation on next assignment change.", e);
_assignedDatastreamTasks.clear();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some tests to cover this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let me add

retryHandleAssignmentChange(isDatastreamUpdate);
return;
} catch (InterruptedException e) {
Expand Down