From 22b3f4912456031080bd9f27de3b1727809dc6c0 Mon Sep 17 00:00:00 2001 From: aykhande Date: Sat, 14 Feb 2026 10:16:31 +0530 Subject: [PATCH 1/3] Fix task creation being skipped bug due to timeouts --- .../com/linkedin/datastream/server/Coordinator.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 4ada0864f..d72ac1871 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -865,8 +865,15 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx try { getAssignmentsFuture(assignmentChangeFutures, start, isDatastreamUpdate); } catch (TimeoutException e) { - // if it's timeout then we will retry - _log.warn("Timeout when doing the assignment", e); + // Clear the current assignment state to force full reconciliation on the next assignment change. + // Without this, _assignedDatastreamTasks retains stale state from the previous successful assignment, + // causing the next handleAssignmentChange() to compute an incorrect diff. For example, a task that was + // unassigned and stopped by the connector but not removed from _assignedDatastreamTasks (due to this + // timeout) would be incorrectly treated as "already running" if re-assigned, resulting in no + // ConnectorTask being created. Clearing follows the same pattern as onSessionExpired(). + _log.warn("Timeout when doing the assignment. Clearing current assignment state to force full " + + "reconciliation on next assignment change.", e); + _assignedDatastreamTasks.clear(); retryHandleAssignmentChange(isDatastreamUpdate); return; } catch (InterruptedException e) { From 5e0dd77ea9f24ad9e4adc86f91a9b178e7db6743 Mon Sep 17 00:00:00 2001 From: aykhande Date: Mon, 16 Feb 2026 20:02:05 +0530 Subject: [PATCH 2/3] Add test cases --- .../datastream/server/TestCoordinator.java | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index 1055418c3..ecac2e7c6 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -6,6 +6,7 @@ package com.linkedin.datastream.server; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.time.Instant; @@ -4505,4 +4506,143 @@ public TestSetup(EmbeddedDatastreamCluster datastreamKafkaCluster, Coordinator c _connector = connector; } } + + /** + * Sets the private static final ASSIGNMENT_TIMEOUT field on Coordinator via Unsafe. + * Returns the original value so it can be restored. + * Uses sun.misc.Unsafe because JDK 17+ no longer allows modifying static final fields via standard reflection. + */ + private Duration setAssignmentTimeout(Duration newTimeout) throws Exception { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + sun.misc.Unsafe unsafe = (sun.misc.Unsafe) unsafeField.get(null); + + Field field = Coordinator.class.getDeclaredField("ASSIGNMENT_TIMEOUT"); + Object base = unsafe.staticFieldBase(field); + long offset = unsafe.staticFieldOffset(field); + + Duration original = (Duration) unsafe.getObject(base, offset); + unsafe.putObject(base, offset, newTimeout); + return original; + } + + @Test + public void testHandleAssignmentChangeClearsStateOnTimeout() throws Exception { + String testCluster = "testHandleAssignmentChangeClearsStateOnTimeout"; + String testConnectorType = "testConnectorType"; + String datastreamName1 = "datastream1"; + + // Reduce ASSIGNMENT_TIMEOUT so the test doesn't wait 90 seconds + Duration originalTimeout = setAssignmentTimeout(Duration.ofSeconds(1)); + + try { + Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster); + + // A connector that blocks on onAssignmentChange long enough to trigger the timeout + CountDownLatch blockingLatch = new CountDownLatch(1); + TestHookConnector slowConnector = new TestHookConnector("slowConnector", testConnectorType) { + @Override + public void onAssignmentChange(List tasks) { + try { + // Block for longer than ASSIGNMENT_TIMEOUT (1 second) + blockingLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + super.onAssignmentChange(tasks); + } + }; + + instance1.addConnector(testConnectorType, slowConnector, new BroadcastStrategy(Optional.empty()), false, + new SourceBasedDeduper(), null); + instance1.start(); + + ZkClient zkClient = new ZkClient(_zkConnectionString); + DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName1); + + // Wait for the timeout to occur and state to be cleared + Assert.assertTrue(PollUtils.poll(() -> instance1.getDatastreamTasks().isEmpty(), 200, 10000), + "Expected _assignedDatastreamTasks to be cleared after assignment timeout"); + + // Unblock the connector so it can process retried assignments + blockingLatch.countDown(); + + // After unblocking, the retried assignment should succeed + assertConnectorAssignment(slowConnector, 10000, datastreamName1); + + // Verify the coordinator has the task tracked after successful retry + Assert.assertFalse(instance1.getDatastreamTasks().isEmpty(), + "Expected _assignedDatastreamTasks to be populated after successful retry"); + + instance1.stop(); + instance1.getDatastreamCache().getZkclient().close(); + zkClient.close(); + } finally { + setAssignmentTimeout(originalTimeout); + } + } + + /** + * Verifies that after a timeout clears _assignedDatastreamTasks, a re-assignment + * does NOT skip the task (i.e., the task is not incorrectly treated as "already running"). + * This is the core bug that the clear() fix addresses. + */ + @Test + public void testHandleAssignmentChangeTimeoutDoesNotSkipTaskOnReassignment() throws Exception { + String testCluster = "testHandleAssignmentChangeTimeoutDoesNotSkipTaskOnReassignment"; + String testConnectorType = "testConnectorType"; + String datastreamName1 = "datastream1"; + + // Reduce ASSIGNMENT_TIMEOUT so the test doesn't wait 90 seconds + Duration originalTimeout = setAssignmentTimeout(Duration.ofSeconds(1)); + + try { + java.util.concurrent.atomic.AtomicInteger assignmentChangeCount = new java.util.concurrent.atomic.AtomicInteger(0); + // Block only the first onAssignmentChange; let subsequent ones through immediately + CountDownLatch blockFirstAssignment = new CountDownLatch(1); + + Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster); + + TestHookConnector connector = new TestHookConnector("connector1", testConnectorType) { + @Override + public void onAssignmentChange(List tasks) { + int count = assignmentChangeCount.incrementAndGet(); + if (count == 1) { + // Block the first assignment to trigger timeout. Use a long wait so the + // coordinator's 1-second timeout fires while we're still blocked. + try { + blockFirstAssignment.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + super.onAssignmentChange(tasks); + } + }; + + instance1.addConnector(testConnectorType, connector, new BroadcastStrategy(Optional.empty()), false, + new SourceBasedDeduper(), null); + instance1.start(); + + ZkClient zkClient = new ZkClient(_zkConnectionString); + DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName1); + + // Unblock the first (timed-out) assignment after giving coordinator time to timeout and retry + Thread.sleep(3000); + blockFirstAssignment.countDown(); + + // The retry should successfully assign the task even though the first attempt timed out + assertConnectorAssignment(connector, 15000, datastreamName1); + + // Verify the coordinator has the task tracked properly after successful retry + Assert.assertFalse(instance1.getDatastreamTasks().isEmpty(), + "Expected _assignedDatastreamTasks to be populated after successful retry"); + + instance1.stop(); + instance1.getDatastreamCache().getZkclient().close(); + zkClient.close(); + } finally { + setAssignmentTimeout(originalTimeout); + } + } } From 02a62a711931d1b6b03e80aab1aab5a2ca14d19b Mon Sep 17 00:00:00 2001 From: aykhande Date: Tue, 24 Feb 2026 17:30:23 +0530 Subject: [PATCH 3/3] Fixed TCs --- .../datastream/server/TestCoordinator.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java index ecac2e7c6..17de1fbb9 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/TestCoordinator.java @@ -4508,21 +4508,20 @@ public TestSetup(EmbeddedDatastreamCluster datastreamKafkaCluster, Coordinator c } /** - * Sets the private static final ASSIGNMENT_TIMEOUT field on Coordinator via Unsafe. + * Sets the private static final ASSIGNMENT_TIMEOUT field on Coordinator via reflection. * Returns the original value so it can be restored. - * Uses sun.misc.Unsafe because JDK 17+ no longer allows modifying static final fields via standard reflection. */ private Duration setAssignmentTimeout(Duration newTimeout) throws Exception { - Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - sun.misc.Unsafe unsafe = (sun.misc.Unsafe) unsafeField.get(null); - Field field = Coordinator.class.getDeclaredField("ASSIGNMENT_TIMEOUT"); - Object base = unsafe.staticFieldBase(field); - long offset = unsafe.staticFieldOffset(field); + field.setAccessible(true); + + // Remove the 'final' modifier so we can write to the field + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~java.lang.reflect.Modifier.FINAL); - Duration original = (Duration) unsafe.getObject(base, offset); - unsafe.putObject(base, offset, newTimeout); + Duration original = (Duration) field.get(null); + field.set(null, newTimeout); return original; }