diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index 1055418c3..17de1fbb9 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -6,6 +6,7 @@ package com.linkedin.datastream.server; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.time.Instant; @@ -4505,4 +4506,142 @@ public TestSetup(EmbeddedDatastreamCluster datastreamKafkaCluster, Coordinator c _connector = connector; } } + + /** + * Sets the private static final ASSIGNMENT_TIMEOUT field on Coordinator via reflection. + * Returns the original value so it can be restored. + */ + private Duration setAssignmentTimeout(Duration newTimeout) throws Exception { + Field field = Coordinator.class.getDeclaredField("ASSIGNMENT_TIMEOUT"); + field.setAccessible(true); + + // Remove the 'final' modifier so we can write to the field + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + + Duration original = (Duration) field.get(null); + field.set(null, newTimeout); + return original; + } + + @Test + public void testHandleAssignmentChangeClearsStateOnTimeout() throws Exception { + String testCluster = "testHandleAssignmentChangeClearsStateOnTimeout"; + String testConnectorType = "testConnectorType"; + String datastreamName1 = "datastream1"; + + // Reduce ASSIGNMENT_TIMEOUT so the test doesn't wait 90 seconds + Duration originalTimeout = setAssignmentTimeout(Duration.ofSeconds(1)); + + try { + Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster); + + // A connector that blocks on onAssignmentChange long enough to trigger the timeout + CountDownLatch blockingLatch = new CountDownLatch(1); + TestHookConnector slowConnector = new TestHookConnector("slowConnector", testConnectorType) { + @Override + public void onAssignmentChange(List tasks) { + try { + // Block for longer than ASSIGNMENT_TIMEOUT (1 second) + blockingLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + super.onAssignmentChange(tasks); + } + }; + + instance1.addConnector(testConnectorType, slowConnector, new BroadcastStrategy(Optional.empty()), false, + new SourceBasedDeduper(), null); + instance1.start(); + + ZkClient zkClient = new ZkClient(_zkConnectionString); + DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName1); + + // Wait for the timeout to occur and state to be cleared + Assert.assertTrue(PollUtils.poll(() -> instance1.getDatastreamTasks().isEmpty(), 200, 10000), + "Expected _assignedDatastreamTasks to be cleared after assignment timeout"); + + // Unblock the connector so it can process retried assignments + blockingLatch.countDown(); + + // After unblocking, the retried assignment should succeed + assertConnectorAssignment(slowConnector, 10000, datastreamName1); + + // Verify the coordinator has the task tracked after successful retry + Assert.assertFalse(instance1.getDatastreamTasks().isEmpty(), + "Expected _assignedDatastreamTasks to be populated after successful retry"); + + instance1.stop(); + instance1.getDatastreamCache().getZkclient().close(); + zkClient.close(); + } finally { + setAssignmentTimeout(originalTimeout); + } + } + + /** + * Verifies that after a timeout clears _assignedDatastreamTasks, a re-assignment + * does NOT skip the task (i.e., the task is not incorrectly treated as "already running"). + * This is the core bug that the clear() fix addresses. + */ + @Test + public void testHandleAssignmentChangeTimeoutDoesNotSkipTaskOnReassignment() throws Exception { + String testCluster = "testHandleAssignmentChangeTimeoutDoesNotSkipTaskOnReassignment"; + String testConnectorType = "testConnectorType"; + String datastreamName1 = "datastream1"; + + // Reduce ASSIGNMENT_TIMEOUT so the test doesn't wait 90 seconds + Duration originalTimeout = setAssignmentTimeout(Duration.ofSeconds(1)); + + try { + java.util.concurrent.atomic.AtomicInteger assignmentChangeCount = new java.util.concurrent.atomic.AtomicInteger(0); + // Block only the first onAssignmentChange; let subsequent ones through immediately + CountDownLatch blockFirstAssignment = new CountDownLatch(1); + + Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster); + + TestHookConnector connector = new TestHookConnector("connector1", testConnectorType) { + @Override + public void onAssignmentChange(List tasks) { + int count = assignmentChangeCount.incrementAndGet(); + if (count == 1) { + // Block the first assignment to trigger timeout. Use a long wait so the + // coordinator's 1-second timeout fires while we're still blocked. + try { + blockFirstAssignment.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + super.onAssignmentChange(tasks); + } + }; + + instance1.addConnector(testConnectorType, connector, new BroadcastStrategy(Optional.empty()), false, + new SourceBasedDeduper(), null); + instance1.start(); + + ZkClient zkClient = new ZkClient(_zkConnectionString); + DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName1); + + // Unblock the first (timed-out) assignment after giving coordinator time to timeout and retry + Thread.sleep(3000); + blockFirstAssignment.countDown(); + + // The retry should successfully assign the task even though the first attempt timed out + assertConnectorAssignment(connector, 15000, datastreamName1); + + // Verify the coordinator has the task tracked properly after successful retry + Assert.assertFalse(instance1.getDatastreamTasks().isEmpty(), + "Expected _assignedDatastreamTasks to be populated after successful retry"); + + instance1.stop(); + instance1.getDatastreamCache().getZkclient().close(); + zkClient.close(); + } finally { + setAssignmentTimeout(originalTimeout); + } + } } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 4ada0864f..d72ac1871 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -865,8 +865,15 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx try { getAssignmentsFuture(assignmentChangeFutures, start, isDatastreamUpdate); } catch (TimeoutException e) { - // if it's timeout then we will retry - _log.warn("Timeout when doing the assignment", e); + // Clear the current assignment state to force full reconciliation on the next assignment change. + // Without this, _assignedDatastreamTasks retains stale state from the previous successful assignment, + // causing the next handleAssignmentChange() to compute an incorrect diff. For example, a task that was + // unassigned and stopped by the connector but not removed from _assignedDatastreamTasks (due to this + // timeout) would be incorrectly treated as "already running" if re-assigned, resulting in no + // ConnectorTask being created. Clearing follows the same pattern as onSessionExpired(). + _log.warn("Timeout when doing the assignment. Clearing current assignment state to force full " + + "reconciliation on next assignment change.", e); + _assignedDatastreamTasks.clear(); retryHandleAssignmentChange(isDatastreamUpdate); return; } catch (InterruptedException e) {