From c147a10e51831daff767635e863f27eb3e19a410 Mon Sep 17 00:00:00 2001 From: aykhande Date: Sat, 21 Feb 2026 14:42:10 +0530 Subject: [PATCH 1/2] Fix PollUtils.poll to use wall-clock time for timeout instead of accumulating sleep durations --- .../java/com/linkedin/datastream/common/PollUtils.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datastream-utils/src/main/java/com/linkedin/datastream/common/PollUtils.java b/datastream-utils/src/main/java/com/linkedin/datastream/common/PollUtils.java index 7053954fc..2b1b763a0 100644 --- a/datastream-utils/src/main/java/com/linkedin/datastream/common/PollUtils.java +++ b/datastream-utils/src/main/java/com/linkedin/datastream/common/PollUtils.java @@ -121,7 +121,7 @@ public static boolean poll(BooleanSupplier cond, long periodMs, long timeoutMs) * @return true if condition is met, false otherwise */ public static boolean poll(InterruptablePredicate cond, long periodMs, long timeoutMs, T arg) { - long elapsedMs = 0; + long startMs = System.currentTimeMillis(); if (timeoutMs > 0 && periodMs > timeoutMs) { return false; } @@ -134,8 +134,7 @@ public static boolean poll(InterruptablePredicate cond, long periodMs, lo } catch (InterruptedException e) { break; } - elapsedMs += periodMs; - if (timeoutMs > 0 && elapsedMs >= timeoutMs) { + if (timeoutMs > 0 && System.currentTimeMillis() - startMs >= timeoutMs) { break; } } @@ -152,7 +151,7 @@ public static boolean poll(InterruptablePredicate cond, long periodMs, lo */ public static Optional poll(InterruptableSupplier interruptableSupplier, InterruptablePredicate cond, long periodMs, long timeoutMs) { - long elapsedMs = 0; + long startMs = System.currentTimeMillis(); if (periodMs > timeoutMs) { return Optional.empty(); } @@ -167,8 +166,7 @@ public static Optional poll(InterruptableSupplier interruptableSupplie } catch (InterruptedException e) { break; } - elapsedMs += periodMs; - if (elapsedMs >= timeoutMs) { + if (System.currentTimeMillis() - startMs >= timeoutMs) { break; } } From c1844e7bb5a067c18d393c5f0c3b6d77892db1e5 Mon Sep 17 00:00:00 2001 From: aykhande Date: Tue, 24 Feb 2026 14:27:32 +0530 Subject: [PATCH 2/2] Added TCs --- .../datastream/common/TestPollUtils.java | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/datastream-utils/src/test/java/com/linkedin/datastream/common/TestPollUtils.java b/datastream-utils/src/test/java/com/linkedin/datastream/common/TestPollUtils.java index 019bc1512..c9b1fd556 100644 --- a/datastream-utils/src/test/java/com/linkedin/datastream/common/TestPollUtils.java +++ b/datastream-utils/src/test/java/com/linkedin/datastream/common/TestPollUtils.java @@ -5,6 +5,8 @@ */ package com.linkedin.datastream.common; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BooleanSupplier; import org.testng.Assert; @@ -55,4 +57,173 @@ public void testpollWithPredicate() { long now2 = System.currentTimeMillis(); Assert.assertTrue(now2 - now1 >= 350); } + + /** + * Validates that the predicate overload uses wall-clock time for timeout. + * The predicate simulates a slow operation (50ms per call) with a short sleep period (10ms). + * With the old accumulator-based approach, only 10ms would count per iteration toward the + * 500ms timeout, so it would take ~3000ms wall-clock time (500/10 * 60ms per iteration). + * With wall-clock timeout, it should complete within ~600ms (500ms timeout + tolerance). + */ + @Test + public void testPredicatePollTimesOutByWallClock() { + AtomicInteger invocationCount = new AtomicInteger(0); + long periodMs = 10; + long timeoutMs = 500; + + long startMs = System.currentTimeMillis(); + boolean result = PollUtils.poll((ignored) -> { + invocationCount.incrementAndGet(); + try { + // Simulate a slow predicate (e.g., network call) that takes 50ms each time + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return false; // Never satisfy the condition + }, periodMs, timeoutMs, null); + long elapsedMs = System.currentTimeMillis() - startMs; + + Assert.assertFalse(result, "Poll should return false since the condition is never met"); + // Wall-clock elapsed time should be close to the timeout, not inflated + // With old code: elapsed would be ~invocations * (50ms + 10ms) = many seconds + // With fix: elapsed should be approximately timeoutMs + Assert.assertTrue(elapsedMs < timeoutMs + 200, + "Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout (" + + timeoutMs + "ms). Predicate was invoked " + invocationCount.get() + " times."); + Assert.assertTrue(elapsedMs >= timeoutMs - 100, + "Wall-clock time (" + elapsedMs + "ms) should be close to timeout (" + timeoutMs + "ms)"); + } + + /** + * Validates that the supplier overload uses wall-clock time for timeout. + * The supplier simulates a slow operation (50ms per call) with a short sleep period (10ms). + */ + @Test + public void testSupplierPollTimesOutByWallClock() { + AtomicInteger invocationCount = new AtomicInteger(0); + long periodMs = 10; + long timeoutMs = 500; + + long startMs = System.currentTimeMillis(); + Optional result = PollUtils.poll(() -> { + invocationCount.incrementAndGet(); + try { + // Simulate a slow supplier (e.g., schema registry lookup) that takes 50ms + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; // Return null so the condition check below fails + }, (val) -> val != null, periodMs, timeoutMs); + long elapsedMs = System.currentTimeMillis() - startMs; + + Assert.assertFalse(result.isPresent(), "Poll should return empty since supplier never returns non-null"); + Assert.assertTrue(elapsedMs < timeoutMs + 200, + "Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout (" + + timeoutMs + "ms). Supplier was invoked " + invocationCount.get() + " times."); + Assert.assertTrue(elapsedMs >= timeoutMs - 100, + "Wall-clock time (" + elapsedMs + "ms) should be close to timeout (" + timeoutMs + "ms)"); + } + + /** + * Validates that the predicate overload still returns true promptly when the condition + * is met, even when the predicate takes time to execute. + */ + @Test + public void testPredicatePollSucceedsWithSlowPredicate() { + AtomicInteger invocationCount = new AtomicInteger(0); + + long startMs = System.currentTimeMillis(); + boolean result = PollUtils.poll((ignored) -> { + int count = invocationCount.incrementAndGet(); + try { + Thread.sleep(30); // Each invocation takes 30ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return count >= 3; // Succeed on the 3rd invocation + }, 10, 2000, null); + long elapsedMs = System.currentTimeMillis() - startMs; + + Assert.assertTrue(result, "Poll should succeed on the 3rd invocation"); + Assert.assertEquals(invocationCount.get(), 3); + // 3 invocations * ~40ms each (30ms work + 10ms sleep) = ~120ms, well under 2s timeout + Assert.assertTrue(elapsedMs < 500, + "Should complete well before timeout. Elapsed: " + elapsedMs + "ms"); + } + + /** + * Validates that the supplier overload still returns the result promptly when + * the condition is met, even when the supplier takes time to execute. + */ + @Test + public void testSupplierPollSucceedsWithSlowSupplier() { + AtomicInteger invocationCount = new AtomicInteger(0); + + long startMs = System.currentTimeMillis(); + Optional result = PollUtils.poll(() -> { + int count = invocationCount.incrementAndGet(); + try { + Thread.sleep(30); // Each invocation takes 30ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return count >= 3 ? "success" : null; + }, (val) -> val != null, 10, 2000); + long elapsedMs = System.currentTimeMillis() - startMs; + + Assert.assertTrue(result.isPresent(), "Poll should return a value on the 3rd invocation"); + Assert.assertEquals(result.get(), "success"); + Assert.assertEquals(invocationCount.get(), 3); + Assert.assertTrue(elapsedMs < 500, + "Should complete well before timeout. Elapsed: " + elapsedMs + "ms"); + } + + /** + * Validates that the BooleanSupplier overload (which delegates to the predicate overload) + * also respects wall-clock timeout when the supplier is slow. + */ + @Test + public void testBooleanSupplierPollTimesOutByWallClock() { + AtomicInteger invocationCount = new AtomicInteger(0); + long periodMs = 10; + long timeoutMs = 500; + + long startMs = System.currentTimeMillis(); + boolean result = PollUtils.poll(() -> { + invocationCount.incrementAndGet(); + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return false; + }, periodMs, timeoutMs); + long elapsedMs = System.currentTimeMillis() - startMs; + + Assert.assertFalse(result); + Assert.assertTrue(elapsedMs < timeoutMs + 200, + "Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout (" + + timeoutMs + "ms). Supplier was invoked " + invocationCount.get() + " times."); + } + + /** + * Validates that periodMs > timeoutMs still returns false/empty immediately + * for both overloads (edge case preserved by the fix). + */ + @Test + public void testPeriodExceedsTimeoutReturnsImmediately() { + long startMs = System.currentTimeMillis(); + + // Predicate overload with positive timeout + Assert.assertFalse(PollUtils.poll((ignored) -> true, 200, 100, null)); + + // Supplier overload + Optional result = PollUtils.poll(() -> "value", (val) -> true, 200, 100); + Assert.assertFalse(result.isPresent()); + + long elapsedMs = System.currentTimeMillis() - startMs; + Assert.assertTrue(elapsedMs < 50, "Should return immediately when periodMs > timeoutMs"); + } }