Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public static boolean poll(BooleanSupplier cond, long periodMs, long timeoutMs)
* @return true if condition is met, false otherwise
*/
public static <T> boolean poll(InterruptablePredicate<T> cond, long periodMs, long timeoutMs, T arg) {
long elapsedMs = 0;
long startMs = System.currentTimeMillis();
if (timeoutMs > 0 && periodMs > timeoutMs) {
return false;
}
Expand All @@ -134,8 +134,7 @@ public static <T> boolean poll(InterruptablePredicate<T> cond, long periodMs, lo
} catch (InterruptedException e) {
break;
}
elapsedMs += periodMs;
if (timeoutMs > 0 && elapsedMs >= timeoutMs) {
if (timeoutMs > 0 && System.currentTimeMillis() - startMs >= timeoutMs) {
break;
}
}
Expand All @@ -152,7 +151,7 @@ public static <T> boolean poll(InterruptablePredicate<T> cond, long periodMs, lo
*/
public static <T> Optional<T> poll(InterruptableSupplier<T> interruptableSupplier, InterruptablePredicate<T> cond,
long periodMs, long timeoutMs) {
long elapsedMs = 0;
long startMs = System.currentTimeMillis();
if (periodMs > timeoutMs) {
return Optional.empty();
}
Expand All @@ -167,8 +166,7 @@ public static <T> Optional<T> poll(InterruptableSupplier<T> interruptableSupplie
} catch (InterruptedException e) {
break;
}
elapsedMs += periodMs;
if (elapsedMs >= timeoutMs) {
if (System.currentTimeMillis() - startMs >= timeoutMs) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package com.linkedin.datastream.common;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;

import org.testng.Assert;
Expand Down Expand Up @@ -55,4 +57,173 @@ public void testpollWithPredicate() {
long now2 = System.currentTimeMillis();
Assert.assertTrue(now2 - now1 >= 350);
}

/**
* Validates that the predicate overload uses wall-clock time for timeout.
* The predicate simulates a slow operation (50ms per call) with a short sleep period (10ms).
* With the old accumulator-based approach, only 10ms would count per iteration toward the
* 500ms timeout, so it would take ~3000ms wall-clock time (500/10 * 60ms per iteration).
* With wall-clock timeout, it should complete within ~600ms (500ms timeout + tolerance).
*/
@Test
public void testPredicatePollTimesOutByWallClock() {
AtomicInteger invocationCount = new AtomicInteger(0);
long periodMs = 10;
long timeoutMs = 500;

long startMs = System.currentTimeMillis();
boolean result = PollUtils.poll((ignored) -> {
invocationCount.incrementAndGet();
try {
// Simulate a slow predicate (e.g., network call) that takes 50ms each time
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false; // Never satisfy the condition
}, periodMs, timeoutMs, null);
long elapsedMs = System.currentTimeMillis() - startMs;

Assert.assertFalse(result, "Poll should return false since the condition is never met");
// Wall-clock elapsed time should be close to the timeout, not inflated
// With old code: elapsed would be ~invocations * (50ms + 10ms) = many seconds
// With fix: elapsed should be approximately timeoutMs
Assert.assertTrue(elapsedMs < timeoutMs + 200,
"Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout ("
+ timeoutMs + "ms). Predicate was invoked " + invocationCount.get() + " times.");
Assert.assertTrue(elapsedMs >= timeoutMs - 100,
"Wall-clock time (" + elapsedMs + "ms) should be close to timeout (" + timeoutMs + "ms)");
}

/**
* Validates that the supplier overload uses wall-clock time for timeout.
* The supplier simulates a slow operation (50ms per call) with a short sleep period (10ms).
*/
@Test
public void testSupplierPollTimesOutByWallClock() {
AtomicInteger invocationCount = new AtomicInteger(0);
long periodMs = 10;
long timeoutMs = 500;

long startMs = System.currentTimeMillis();
Optional<String> result = PollUtils.poll(() -> {
invocationCount.incrementAndGet();
try {
// Simulate a slow supplier (e.g., schema registry lookup) that takes 50ms
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null; // Return null so the condition check below fails
}, (val) -> val != null, periodMs, timeoutMs);
long elapsedMs = System.currentTimeMillis() - startMs;

Assert.assertFalse(result.isPresent(), "Poll should return empty since supplier never returns non-null");
Assert.assertTrue(elapsedMs < timeoutMs + 200,
"Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout ("
+ timeoutMs + "ms). Supplier was invoked " + invocationCount.get() + " times.");
Assert.assertTrue(elapsedMs >= timeoutMs - 100,
"Wall-clock time (" + elapsedMs + "ms) should be close to timeout (" + timeoutMs + "ms)");
}

/**
* Validates that the predicate overload still returns true promptly when the condition
* is met, even when the predicate takes time to execute.
*/
@Test
public void testPredicatePollSucceedsWithSlowPredicate() {
AtomicInteger invocationCount = new AtomicInteger(0);

long startMs = System.currentTimeMillis();
boolean result = PollUtils.poll((ignored) -> {
int count = invocationCount.incrementAndGet();
try {
Thread.sleep(30); // Each invocation takes 30ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return count >= 3; // Succeed on the 3rd invocation
}, 10, 2000, null);
long elapsedMs = System.currentTimeMillis() - startMs;

Assert.assertTrue(result, "Poll should succeed on the 3rd invocation");
Assert.assertEquals(invocationCount.get(), 3);
// 3 invocations * ~40ms each (30ms work + 10ms sleep) = ~120ms, well under 2s timeout
Assert.assertTrue(elapsedMs < 500,
"Should complete well before timeout. Elapsed: " + elapsedMs + "ms");
}

/**
* Validates that the supplier overload still returns the result promptly when
* the condition is met, even when the supplier takes time to execute.
*/
@Test
public void testSupplierPollSucceedsWithSlowSupplier() {
AtomicInteger invocationCount = new AtomicInteger(0);

long startMs = System.currentTimeMillis();
Optional<String> result = PollUtils.poll(() -> {
int count = invocationCount.incrementAndGet();
try {
Thread.sleep(30); // Each invocation takes 30ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return count >= 3 ? "success" : null;
}, (val) -> val != null, 10, 2000);
long elapsedMs = System.currentTimeMillis() - startMs;

Assert.assertTrue(result.isPresent(), "Poll should return a value on the 3rd invocation");
Assert.assertEquals(result.get(), "success");
Assert.assertEquals(invocationCount.get(), 3);
Assert.assertTrue(elapsedMs < 500,
"Should complete well before timeout. Elapsed: " + elapsedMs + "ms");
}

/**
* Validates that the BooleanSupplier overload (which delegates to the predicate overload)
* also respects wall-clock timeout when the supplier is slow.
*/
@Test
public void testBooleanSupplierPollTimesOutByWallClock() {
AtomicInteger invocationCount = new AtomicInteger(0);
long periodMs = 10;
long timeoutMs = 500;

long startMs = System.currentTimeMillis();
boolean result = PollUtils.poll(() -> {
invocationCount.incrementAndGet();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}, periodMs, timeoutMs);
long elapsedMs = System.currentTimeMillis() - startMs;

Assert.assertFalse(result);
Assert.assertTrue(elapsedMs < timeoutMs + 200,
"Wall-clock time (" + elapsedMs + "ms) should not significantly exceed timeout ("
+ timeoutMs + "ms). Supplier was invoked " + invocationCount.get() + " times.");
}

/**
* Validates that periodMs > timeoutMs still returns false/empty immediately
* for both overloads (edge case preserved by the fix).
*/
@Test
public void testPeriodExceedsTimeoutReturnsImmediately() {
long startMs = System.currentTimeMillis();

// Predicate overload with positive timeout
Assert.assertFalse(PollUtils.poll((ignored) -> true, 200, 100, null));

// Supplier overload
Optional<String> result = PollUtils.poll(() -> "value", (val) -> true, 200, 100);
Assert.assertFalse(result.isPresent());

long elapsedMs = System.currentTimeMillis() - startMs;
Assert.assertTrue(elapsedMs < 50, "Should return immediately when periodMs > timeoutMs");
}
}