From 2af319743e46abd9156dbdac35fbc1a9a89f3898 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 10:52:07 +0900 Subject: [PATCH 01/17] feat: VirtualThreads concurrent executor --- build.gradle | 4 +- .../commons/concurrent/VirtualThreads.java | 22 +++ .../concurrent/VirtualThreadsTest.java | 125 ++++++++++++++++++ 3 files changed, 149 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java create mode 100644 src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java diff --git a/build.gradle b/build.gradle index 4ee5281..3fe0833 100644 --- a/build.gradle +++ b/build.gradle @@ -17,8 +17,8 @@ group "com.autonomouslogic.commons" java { withJavadocJar() withSourcesJar() - sourceCompatibility = JavaVersion.VERSION_11 - targetCompatibility = JavaVersion.VERSION_11 + sourceCompatibility = JavaVersion.VERSION_21 + targetCompatibility = JavaVersion.VERSION_21 } repositories { diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java new file mode 100644 index 0000000..ed1109a --- /dev/null +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -0,0 +1,22 @@ +package com.autonomouslogic.commons.concurrent; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; +import lombok.NonNull; + +public class VirtualThreads { + /** + * Runs tasks concurrently on the provided virtual thread executor. + * This is a blocking method. + * + * @param tasks the tasks to be executed + * @param concurrency the maximum concurrency to allow for executing tasks + * @return a list of results in the same order as the stream + */ + public static List runAll(@NonNull Stream> tasks, int concurrency) + throws InterruptedException, ExecutionException { + return null; + } +} diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java new file mode 100644 index 0000000..8aa1ee3 --- /dev/null +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -0,0 +1,125 @@ +package com.autonomouslogic.commons.concurrent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.security.SecureRandom; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; + +class VirtualThreadsTest { + @Test + void shouldHandleSingleTask() throws Exception { + var results = VirtualThreads.runAll(Stream.of((Callable) () -> "single-result"), 1); + + assertNotNull(results); + assertEquals(1, results.size()); + assertEquals("single-result", results.get(0)); + } + + @Test + void shouldExecuteAllTasksAndReturnResultsInOrder() throws Exception { + testGeneric(50, 5); + } + + @Test + void shouldMaintainOrderWithLongTasks() throws Exception { + var rng = SecureRandom.getInstanceStrong(); + var taskCount = 50; + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { + Thread.sleep(rng.nextInt(200) + 200); + return i; + }); + + var results = VirtualThreads.runAll(tasks, 5); + + assertNotNull(results); + assertEquals(taskCount, results.size()); + for (var i = 0; i < taskCount; i++) { + assertEquals(i, results.get(i)); + } + } + + @Test + void shouldMaintainMaxConcurrency() throws Exception { + var taskCount = 50; + var concurrency = 5; + var currentConcurrency = new AtomicInteger(); + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { + var c = currentConcurrency.incrementAndGet(); + if (i >= taskCount && i < (taskCount - concurrency)) { + assertEquals(concurrency, c); + } + Thread.sleep(100); + return i; + }); + + var results = VirtualThreads.runAll(tasks, concurrency); + + assertNotNull(results); + assertEquals(taskCount, results.size()); + for (var i = 0; i < taskCount; i++) { + assertEquals(i, results.get(i)); + } + } + + @Test + void shouldHandleEmptyStream() throws Exception { + var results = VirtualThreads.runAll(Stream.empty(), 5); + + assertNotNull(results); + assertTrue(results.isEmpty()); + } + + @Test + void shouldHandleHighConcurrencyWithFewTasks() throws Exception { + testGeneric(5, 50); + } + + @Test + void shouldHandleConcurrencyLimitOfOne() throws Exception { + testGeneric(5, 1); + } + + @Test + void shouldFailFastWhenTaskThrows() throws Exception { + var taskCount = 100; + var concurrency = 5; + var tasksRun = new AtomicInteger(); + var failureMessage = "Task 0 failed"; + + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { + tasksRun.incrementAndGet(); + if (i == 0) { + throw new RuntimeException(failureMessage); + } + Thread.sleep(50); + return i; + }); + + var exception = assertThrows(ExecutionException.class, () -> VirtualThreads.runAll(tasks, concurrency)); + + assertEquals(failureMessage, exception.getCause().getMessage()); + assertTrue( + tasksRun.get() <= concurrency + 1, + "Expected at most concurrency + 1 tasks to run, but " + tasksRun.get() + " tasks ran"); + } + + private static void testGeneric(int taskCount, int concurrency) throws InterruptedException, ExecutionException { + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> i); + + var results = VirtualThreads.runAll(tasks, concurrency); + + assertNotNull(results); + assertEquals(taskCount, results.size()); + for (var i = 0; i < taskCount; i++) { + assertEquals(i, results.get(i)); + } + } +} From ee93276a562d340d7a4238c4517d8652fb491e6a Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 11:05:26 +0900 Subject: [PATCH 02/17] Implementations --- .../commons/concurrent/VirtualThreads.java | 160 +++++++++++++++++- 1 file changed, 158 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index ed1109a..110f25b 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -1,8 +1,18 @@ package com.autonomouslogic.commons.concurrent; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import lombok.NonNull; @@ -15,8 +25,154 @@ public class VirtualThreads { * @param concurrency the maximum concurrency to allow for executing tasks * @return a list of results in the same order as the stream */ - public static List runAll(@NonNull Stream> tasks, int concurrency) + public static List _runAll(@NonNull Stream> tasks, int concurrency) throws InterruptedException, ExecutionException { - return null; + var taskList = tasks.toList(); + + if (taskList.isEmpty()) { + return List.of(); + } + + var executor = Executors.newVirtualThreadPerTaskExecutor(); + var results = new ArrayList(taskList.size()); + var inFlightFutures = new ArrayList>(); + var taskIndex = 0; + + try { + // Interleave submission and result collection for fail-fast behavior + while (taskIndex < taskList.size() || !inFlightFutures.isEmpty()) { + // Submit new tasks up to concurrency limit + while (inFlightFutures.size() < concurrency && taskIndex < taskList.size()) { + final var task = taskList.get(taskIndex); + var future = executor.submit(() -> { + try { + return task.call(); + } catch (Exception e) { + throw e; + } + }); + inFlightFutures.add(future); + taskIndex++; + } + + // Collect one result if we have in-flight tasks + if (!inFlightFutures.isEmpty()) { + try { + @SuppressWarnings("unchecked") + var f = (Future) inFlightFutures.remove(0); + results.add(f.get()); + } catch (ExecutionException e) { + // Cancel all remaining futures on failure + for (var future : inFlightFutures) { + future.cancel(true); + } + throw e; + } + } + } + + return results; + } catch (InterruptedException e) { + // Cancel all in-flight tasks on interrupt + for (var future : inFlightFutures) { + future.cancel(true); + } + Thread.currentThread().interrupt(); + throw e; + } finally { + executor.shutdown(); + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + /** + * Executes tasks on virtual threads with bounded concurrency. + * Returns results in submission order. + * + * Fail-fast: first task failure cancels remaining tasks and propagates. + */ + public static List runAll(Stream> tasks, int maxConcurrency) + throws InterruptedException, ExecutionException { + + if (maxConcurrency <= 0) { + throw new IllegalArgumentException("maxConcurrency must be > 0"); + } + + Iterator> iterator = tasks.iterator(); + + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + + CompletionService> completion = new ExecutorCompletionService<>(executor); + + List results = new ArrayList<>(); + int nextIndex = 0; + int inFlight = 0; + + // Store results by index to preserve ordering + Map completed = new HashMap<>(); + + while (iterator.hasNext() || inFlight > 0) { + + // Fill capacity + while (inFlight < maxConcurrency && iterator.hasNext()) { + int index = nextIndex++; + Callable task = iterator.next(); + + completion.submit(() -> { + T value = task.call(); + return new Result<>(index, value); + }); + + inFlight++; + } + + if (inFlight == 0) break; + + Future> finished; + try { + finished = completion.take(); + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + throw e; + } + + Result result; + try { + result = finished.get(); + } catch (ExecutionException e) { + executor.shutdownNow(); + throw e; // fail fast + } + + inFlight--; + + completed.put(result.index, result.value); + + // Drain in-order results into output list + while (completed.containsKey(results.size())) { + results.add(completed.remove(results.size())); + } + } + + return results; + } + } + + private static class Result { + final int index; + final T value; + + Result(int index, T value) { + this.index = index; + this.value = value; + } } } From fe5590e56289e670bc8aadea287bb9ec9e8a903c Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 11:10:35 +0900 Subject: [PATCH 03/17] Select --- .../commons/concurrent/VirtualThreads.java | 136 ++---------------- 1 file changed, 12 insertions(+), 124 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index 110f25b..ab58596 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -2,166 +2,54 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import lombok.NonNull; public class VirtualThreads { - /** - * Runs tasks concurrently on the provided virtual thread executor. - * This is a blocking method. - * - * @param tasks the tasks to be executed - * @param concurrency the maximum concurrency to allow for executing tasks - * @return a list of results in the same order as the stream - */ - public static List _runAll(@NonNull Stream> tasks, int concurrency) - throws InterruptedException, ExecutionException { - var taskList = tasks.toList(); - - if (taskList.isEmpty()) { - return List.of(); - } - - var executor = Executors.newVirtualThreadPerTaskExecutor(); - var results = new ArrayList(taskList.size()); - var inFlightFutures = new ArrayList>(); - var taskIndex = 0; - - try { - // Interleave submission and result collection for fail-fast behavior - while (taskIndex < taskList.size() || !inFlightFutures.isEmpty()) { - // Submit new tasks up to concurrency limit - while (inFlightFutures.size() < concurrency && taskIndex < taskList.size()) { - final var task = taskList.get(taskIndex); - var future = executor.submit(() -> { - try { - return task.call(); - } catch (Exception e) { - throw e; - } - }); - inFlightFutures.add(future); - taskIndex++; - } - - // Collect one result if we have in-flight tasks - if (!inFlightFutures.isEmpty()) { - try { - @SuppressWarnings("unchecked") - var f = (Future) inFlightFutures.remove(0); - results.add(f.get()); - } catch (ExecutionException e) { - // Cancel all remaining futures on failure - for (var future : inFlightFutures) { - future.cancel(true); - } - throw e; - } - } - } - - return results; - } catch (InterruptedException e) { - // Cancel all in-flight tasks on interrupt - for (var future : inFlightFutures) { - future.cancel(true); - } - Thread.currentThread().interrupt(); - throw e; - } finally { - executor.shutdown(); - try { - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); - Thread.currentThread().interrupt(); - } - } - } - /** * Executes tasks on virtual threads with bounded concurrency. * Returns results in submission order. - * * Fail-fast: first task failure cancels remaining tasks and propagates. + * This method is blocking. */ - public static List runAll(Stream> tasks, int maxConcurrency) + public static List runAll(@NonNull Stream> tasks, int maxConcurrency) throws InterruptedException, ExecutionException { - if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be > 0"); } - - Iterator> iterator = tasks.iterator(); - - try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { - - CompletionService> completion = new ExecutorCompletionService<>(executor); - - List results = new ArrayList<>(); + var iterator = tasks.iterator(); + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + var completion = new ExecutorCompletionService>(executor); + var results = new ArrayList(); int nextIndex = 0; int inFlight = 0; - - // Store results by index to preserve ordering - Map completed = new HashMap<>(); - + var completed = new HashMap(); while (iterator.hasNext() || inFlight > 0) { - - // Fill capacity while (inFlight < maxConcurrency && iterator.hasNext()) { int index = nextIndex++; - Callable task = iterator.next(); - + var task = iterator.next(); completion.submit(() -> { T value = task.call(); return new Result<>(index, value); }); - inFlight++; } - - if (inFlight == 0) break; - - Future> finished; - try { - finished = completion.take(); - } catch (InterruptedException e) { - executor.shutdownNow(); - Thread.currentThread().interrupt(); - throw e; - } - - Result result; - try { - result = finished.get(); - } catch (ExecutionException e) { - executor.shutdownNow(); - throw e; // fail fast + if (inFlight == 0) { + break; } - + var finished = completion.take(); + var result = finished.get(); inFlight--; - completed.put(result.index, result.value); - - // Drain in-order results into output list while (completed.containsKey(results.size())) { results.add(completed.remove(results.size())); } } - return results; } } From 43adf5ca1ee199d3d6f1148194afa99c88e6fdf6 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 13:27:24 +0900 Subject: [PATCH 04/17] Fix --- build.gradle | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/build.gradle b/build.gradle index 3fe0833..f717275 100644 --- a/build.gradle +++ b/build.gradle @@ -43,6 +43,11 @@ dependencies { test { useJUnitPlatform() + jvmArgs = [ + "--add-opens", "java.base/java.lang=ALL-UNNAMED", + "--add-opens", "java.base/java.util=ALL-UNNAMED" + ] + testLogging { events = [ "FAILED", "PASSED", "SKIPPED" ] showExceptions = true From 4bc19e6c511dd2b37a756b7d8c75929a12c6c75b Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 18:10:11 +0900 Subject: [PATCH 05/17] runAll and callAll --- build.gradle | 4 +- .../commons/concurrent/VirtualThreads.java | 35 +- .../concurrent/VirtualThreadsTest.java | 298 +++++++++++++----- 3 files changed, 249 insertions(+), 88 deletions(-) diff --git a/build.gradle b/build.gradle index f717275..e9e9297 100644 --- a/build.gradle +++ b/build.gradle @@ -69,11 +69,11 @@ jacocoTestReport { spotless { java { palantirJavaFormat() - indentWithTabs() + leadingSpacesToTabs() } groovyGradle { - indentWithTabs() + leadingSpacesToTabs() } } diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index ab58596..6227611 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -17,7 +17,7 @@ public class VirtualThreads { * Fail-fast: first task failure cancels remaining tasks and propagates. * This method is blocking. */ - public static List runAll(@NonNull Stream> tasks, int maxConcurrency) + public static List callAll(@NonNull Stream> tasks, int maxConcurrency) throws InterruptedException, ExecutionException { if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be > 0"); @@ -54,6 +54,39 @@ public static List runAll(@NonNull Stream> tasks, int maxConc } } + /** + * Executes tasks on virtual threads with bounded concurrency. + * Fail-fast: first task failure cancels remaining tasks and propagates. + * This method is blocking. + */ + public static void runAll(@NonNull Stream tasks, int maxConcurrency) + throws InterruptedException, ExecutionException { + if (maxConcurrency <= 0) { + throw new IllegalArgumentException("maxConcurrency must be > 0"); + } + var iterator = tasks.iterator(); + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + var completion = new ExecutorCompletionService(executor); + int inFlight = 0; + while (iterator.hasNext() || inFlight > 0) { + while (inFlight < maxConcurrency && iterator.hasNext()) { + var task = iterator.next(); + completion.submit(() -> { + task.run(); + return null; + }); + inFlight++; + } + if (inFlight == 0) { + break; + } + var finished = completion.take(); + finished.get(); + inFlight--; + } + } + } + private static class Result { final int index; final T value; diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index 8aa1ee3..6f6f490 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -11,115 +11,243 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; class VirtualThreadsTest { - @Test - void shouldHandleSingleTask() throws Exception { - var results = VirtualThreads.runAll(Stream.of((Callable) () -> "single-result"), 1); + @Nested + class CallAllTests { + @Test + void shouldHandleSingleTask() throws Exception { + var results = VirtualThreads.callAll(Stream.of((Callable) () -> "single-result"), 1); + + assertNotNull(results); + assertEquals(1, results.size()); + assertEquals("single-result", results.get(0)); + } - assertNotNull(results); - assertEquals(1, results.size()); - assertEquals("single-result", results.get(0)); - } + @Test + void shouldExecuteAllTasksAndReturnResultsInOrder() throws Exception { + testGeneric(50, 5); + } - @Test - void shouldExecuteAllTasksAndReturnResultsInOrder() throws Exception { - testGeneric(50, 5); - } + @Test + void shouldMaintainOrderWithLongTasks() throws Exception { + var rng = SecureRandom.getInstanceStrong(); + var taskCount = 50; + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { + Thread.sleep(rng.nextInt(200) + 200); + return i; + }); + + var results = VirtualThreads.callAll(tasks, 5); + + assertNotNull(results); + assertEquals(taskCount, results.size()); + for (var i = 0; i < taskCount; i++) { + assertEquals(i, results.get(i)); + } + } - @Test - void shouldMaintainOrderWithLongTasks() throws Exception { - var rng = SecureRandom.getInstanceStrong(); - var taskCount = 50; - var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { - Thread.sleep(rng.nextInt(200) + 200); - return i; - }); + @Test + void shouldMaintainMaxConcurrency() throws Exception { + var taskCount = 50; + var concurrency = 5; + var currentConcurrency = new AtomicInteger(); + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { + var c = currentConcurrency.incrementAndGet(); + if (i >= taskCount && i < (taskCount - concurrency)) { + assertEquals(concurrency, c); + } + Thread.sleep(100); + return i; + }); + + var results = VirtualThreads.callAll(tasks, concurrency); + + assertNotNull(results); + assertEquals(taskCount, results.size()); + for (var i = 0; i < taskCount; i++) { + assertEquals(i, results.get(i)); + } + } - var results = VirtualThreads.runAll(tasks, 5); + @Test + void shouldHandleEmptyStream() throws Exception { + var results = VirtualThreads.callAll(Stream.empty(), 5); - assertNotNull(results); - assertEquals(taskCount, results.size()); - for (var i = 0; i < taskCount; i++) { - assertEquals(i, results.get(i)); + assertNotNull(results); + assertTrue(results.isEmpty()); } - } - @Test - void shouldMaintainMaxConcurrency() throws Exception { - var taskCount = 50; - var concurrency = 5; - var currentConcurrency = new AtomicInteger(); - var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { - var c = currentConcurrency.incrementAndGet(); - if (i >= taskCount && i < (taskCount - concurrency)) { - assertEquals(concurrency, c); - } - Thread.sleep(100); - return i; - }); + @Test + void shouldHandleHighConcurrencyWithFewTasks() throws Exception { + testGeneric(5, 50); + } - var results = VirtualThreads.runAll(tasks, concurrency); + @Test + void shouldHandleConcurrencyLimitOfOne() throws Exception { + testGeneric(5, 1); + } - assertNotNull(results); - assertEquals(taskCount, results.size()); - for (var i = 0; i < taskCount; i++) { - assertEquals(i, results.get(i)); + @Test + void shouldFailFastWhenTaskThrows() throws Exception { + var taskCount = 100; + var concurrency = 5; + var tasksRun = new AtomicInteger(); + var failureMessage = "Task 0 failed"; + + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { + tasksRun.incrementAndGet(); + if (i == 0) { + throw new RuntimeException(failureMessage); + } + Thread.sleep(50); + return i; + }); + + var exception = assertThrows(ExecutionException.class, () -> VirtualThreads.callAll(tasks, concurrency)); + + assertEquals(failureMessage, exception.getCause().getMessage()); + assertTrue( + tasksRun.get() <= concurrency + 1, + "Expected at most concurrency + 1 tasks to run, but " + tasksRun.get() + " tasks ran"); } - } - @Test - void shouldHandleEmptyStream() throws Exception { - var results = VirtualThreads.runAll(Stream.empty(), 5); + private static void testGeneric(int taskCount, int concurrency) + throws InterruptedException, ExecutionException { + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> i); - assertNotNull(results); - assertTrue(results.isEmpty()); - } + var results = VirtualThreads.callAll(tasks, concurrency); - @Test - void shouldHandleHighConcurrencyWithFewTasks() throws Exception { - testGeneric(5, 50); + assertNotNull(results); + assertEquals(taskCount, results.size()); + for (var i = 0; i < taskCount; i++) { + assertEquals(i, results.get(i)); + } + } } - @Test - void shouldHandleConcurrencyLimitOfOne() throws Exception { - testGeneric(5, 1); - } + @Nested + class RunAllTests { + @Test + void shouldHandleSingleTask() throws Exception { + VirtualThreads.runAll(Stream.of((Runnable) () -> {}), 1); + } - @Test - void shouldFailFastWhenTaskThrows() throws Exception { - var taskCount = 100; - var concurrency = 5; - var tasksRun = new AtomicInteger(); - var failureMessage = "Task 0 failed"; - - var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { - tasksRun.incrementAndGet(); - if (i == 0) { - throw new RuntimeException(failureMessage); - } - Thread.sleep(50); - return i; - }); + @Test + void shouldExecuteAllTasks() throws Exception { + var taskCount = 50; + var concurrency = 5; + var tasksRun = new AtomicInteger(); - var exception = assertThrows(ExecutionException.class, () -> VirtualThreads.runAll(tasks, concurrency)); + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> tasksRun.incrementAndGet()); - assertEquals(failureMessage, exception.getCause().getMessage()); - assertTrue( - tasksRun.get() <= concurrency + 1, - "Expected at most concurrency + 1 tasks to run, but " + tasksRun.get() + " tasks ran"); - } + VirtualThreads.runAll(tasks, concurrency); - private static void testGeneric(int taskCount, int concurrency) throws InterruptedException, ExecutionException { - var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> i); + assertEquals(taskCount, tasksRun.get()); + } + + @Test + void shouldExecuteAllTasksWithLongDuration() throws Exception { + var rng = SecureRandom.getInstanceStrong(); + var taskCount = 50; + var tasksRun = new AtomicInteger(); + + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> { + try { + Thread.sleep(rng.nextInt(200) + 200); + tasksRun.incrementAndGet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + VirtualThreads.runAll(tasks, 5); + + assertEquals(taskCount, tasksRun.get()); + } + + @Test + void shouldMaintainMaxConcurrency() throws Exception { + var taskCount = 50; + var concurrency = 5; + var currentConcurrency = new AtomicInteger(); + var maxObserved = new AtomicInteger(); + + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> { + var c = currentConcurrency.incrementAndGet(); + maxObserved.accumulateAndGet(c, Math::max); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + currentConcurrency.decrementAndGet(); + }); + + VirtualThreads.runAll(tasks, concurrency); + + assertTrue( + maxObserved.get() <= concurrency, + "Max concurrency was " + maxObserved.get() + ", expected at most " + concurrency); + } + + @Test + void shouldHandleEmptyStream() throws Exception { + VirtualThreads.runAll(Stream.empty(), 5); + } - var results = VirtualThreads.runAll(tasks, concurrency); + @Test + void shouldExecuteAllTasksWithHighConcurrencyAndFewTasks() throws Exception { + var taskCount = 5; + var concurrency = 50; + var tasksRun = new AtomicInteger(); + + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> tasksRun.incrementAndGet()); + + VirtualThreads.runAll(tasks, concurrency); + + assertEquals(taskCount, tasksRun.get()); + } + + @Test + void shouldHandleConcurrencyLimitOfOne() throws Exception { + var taskCount = 5; + var tasksRun = new AtomicInteger(); + + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> tasksRun.incrementAndGet()); + + VirtualThreads.runAll(tasks, 1); + + assertEquals(taskCount, tasksRun.get()); + } - assertNotNull(results); - assertEquals(taskCount, results.size()); - for (var i = 0; i < taskCount; i++) { - assertEquals(i, results.get(i)); + @Test + void shouldFailFastWhenTaskThrows() throws Exception { + var taskCount = 100; + var concurrency = 5; + var tasksRun = new AtomicInteger(); + var failureMessage = "Task 0 failed"; + + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> { + tasksRun.incrementAndGet(); + if (i == 0) { + throw new RuntimeException(failureMessage); + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + var exception = assertThrows(ExecutionException.class, () -> VirtualThreads.runAll(tasks, concurrency)); + + assertEquals(failureMessage, exception.getCause().getMessage()); + assertTrue( + tasksRun.get() <= concurrency + 1, + "Expected at most concurrency + 1 tasks to run, but " + tasksRun.get() + " tasks ran"); } } } From 7552cdce88df033be15f01ab12af8fe2c74f2fe8 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 18:13:52 +0900 Subject: [PATCH 06/17] Refactor --- .../concurrent/VirtualThreadsTest.java | 39 +++++++------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index 6f6f490..149a958 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -137,15 +137,7 @@ void shouldHandleSingleTask() throws Exception { @Test void shouldExecuteAllTasks() throws Exception { - var taskCount = 50; - var concurrency = 5; - var tasksRun = new AtomicInteger(); - - var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> tasksRun.incrementAndGet()); - - VirtualThreads.runAll(tasks, concurrency); - - assertEquals(taskCount, tasksRun.get()); + testGeneric(50, 5); } @Test @@ -200,27 +192,12 @@ void shouldHandleEmptyStream() throws Exception { @Test void shouldExecuteAllTasksWithHighConcurrencyAndFewTasks() throws Exception { - var taskCount = 5; - var concurrency = 50; - var tasksRun = new AtomicInteger(); - - var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> tasksRun.incrementAndGet()); - - VirtualThreads.runAll(tasks, concurrency); - - assertEquals(taskCount, tasksRun.get()); + testGeneric(5, 50); } @Test void shouldHandleConcurrencyLimitOfOne() throws Exception { - var taskCount = 5; - var tasksRun = new AtomicInteger(); - - var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> tasksRun.incrementAndGet()); - - VirtualThreads.runAll(tasks, 1); - - assertEquals(taskCount, tasksRun.get()); + testGeneric(5, 1); } @Test @@ -249,5 +226,15 @@ void shouldFailFastWhenTaskThrows() throws Exception { tasksRun.get() <= concurrency + 1, "Expected at most concurrency + 1 tasks to run, but " + tasksRun.get() + " tasks ran"); } + + private static void testGeneric(int taskCount, int concurrency) throws InterruptedException, ExecutionException { + var tasksRun = new AtomicInteger(); + + var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> tasksRun.incrementAndGet()); + + VirtualThreads.runAll(tasks, concurrency); + + assertEquals(taskCount, tasksRun.get()); + } } } From f74fafffed733c3fd9eff4d5c101a1f14d88f20a Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 18:21:03 +0900 Subject: [PATCH 07/17] Input transform --- .../commons/concurrent/VirtualThreads.java | 23 ++++++++++++++++ .../concurrent/VirtualThreadsTest.java | 26 ++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index 6227611..aff4a6c 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -7,6 +7,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Stream; import lombok.NonNull; @@ -87,6 +89,27 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) } } + /** + * Executes an action on each input with bounded concurrency. + * Results are returned in submission order. + * Fail-fast: first action failure cancels remaining actions and propagates. + * This method is blocking. + */ + public static List callAll(@NonNull Stream inputs, @NonNull Function fn, int maxConcurrency) + throws InterruptedException, ExecutionException { + return callAll(inputs.map(input -> (Callable) () -> fn.apply(input)), maxConcurrency); + } + + /** + * Executes an action on each input with bounded concurrency. + * Fail-fast: first action failure cancels remaining actions and propagates. + * This method is blocking. + */ + public static void runAll(@NonNull Stream inputs, @NonNull Consumer action, int maxConcurrency) + throws InterruptedException, ExecutionException { + runAll(inputs.map(input -> (Runnable) () -> action.accept(input)), maxConcurrency); + } + private static class Result { final int index; final T value; diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index 149a958..b9ff016 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -126,6 +126,19 @@ private static void testGeneric(int taskCount, int concurrency) assertEquals(i, results.get(i)); } } + + @Test + void shouldWrapInputs() throws Exception { + var inputs = IntStream.range(0, 10).boxed(); + + var results = VirtualThreads.callAll(inputs, i -> i * 2, 5); + + assertNotNull(results); + assertEquals(10, results.size()); + for (var i = 0; i < 10; i++) { + assertEquals(i * 2, results.get(i)); + } + } } @Nested @@ -227,7 +240,8 @@ void shouldFailFastWhenTaskThrows() throws Exception { "Expected at most concurrency + 1 tasks to run, but " + tasksRun.get() + " tasks ran"); } - private static void testGeneric(int taskCount, int concurrency) throws InterruptedException, ExecutionException { + private static void testGeneric(int taskCount, int concurrency) + throws InterruptedException, ExecutionException { var tasksRun = new AtomicInteger(); var tasks = IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> tasksRun.incrementAndGet()); @@ -236,5 +250,15 @@ private static void testGeneric(int taskCount, int concurrency) throws Interrupt assertEquals(taskCount, tasksRun.get()); } + + @Test + void shouldWrapInputs() throws Exception { + var inputs = IntStream.range(0, 10).boxed(); + var processed = new AtomicInteger(); + + VirtualThreads.runAll(inputs, i -> processed.incrementAndGet(), 5); + + assertEquals(10, processed.get()); + } } } From 94660284154643955dd4747f3e945321cd08d228 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 18:35:25 +0900 Subject: [PATCH 08/17] Guard code --- .../commons/concurrent/VirtualThreads.java | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index aff4a6c..dad8752 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -44,12 +44,23 @@ public static List callAll(@NonNull Stream> tasks, int maxCon if (inFlight == 0) { break; } - var finished = completion.take(); - var result = finished.get(); - inFlight--; - completed.put(result.index, result.value); - while (completed.containsKey(results.size())) { - results.add(completed.remove(results.size())); + try { + var finished = completion.take(); + try { + var result = finished.get(); + inFlight--; + completed.put(result.index, result.value); + while (completed.containsKey(results.size())) { + results.add(completed.remove(results.size())); + } + } catch (ExecutionException e) { + executor.shutdownNow(); + throw e; + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + throw e; } } return results; @@ -82,8 +93,19 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) if (inFlight == 0) { break; } - var finished = completion.take(); - finished.get(); + try { + var finished = completion.take(); + try { + finished.get(); + } catch (ExecutionException e) { + executor.shutdownNow(); + throw e; + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + throw e; + } inFlight--; } } From 355f53062a2ab4c9217340e8ff04cc4ef679f0d7 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 18:44:52 +0900 Subject: [PATCH 09/17] Close streams --- .../commons/concurrent/VirtualThreads.java | 10 ++-- .../concurrent/VirtualThreadsTest.java | 54 +++++++++++++++++++ 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index dad8752..f1246a6 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -24,8 +24,9 @@ public static List callAll(@NonNull Stream> tasks, int maxCon if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be > 0"); } - var iterator = tasks.iterator(); - try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + try (tasks; + var executor = Executors.newVirtualThreadPerTaskExecutor()) { + var iterator = tasks.iterator(); var completion = new ExecutorCompletionService>(executor); var results = new ArrayList(); int nextIndex = 0; @@ -77,8 +78,9 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be > 0"); } - var iterator = tasks.iterator(); - try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + try (tasks; + var executor = Executors.newVirtualThreadPerTaskExecutor()) { + var iterator = tasks.iterator(); var completion = new ExecutorCompletionService(executor); int inFlight = 0; while (iterator.hasNext() || inFlight > 0) { diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index b9ff016..3091c9a 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -8,6 +8,7 @@ import java.security.SecureRandom; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -139,6 +140,31 @@ void shouldWrapInputs() throws Exception { assertEquals(i * 2, results.get(i)); } } + + @Test + void shouldCloseStreamOnTaskFailure() throws Exception { + var streamClosed = new AtomicBoolean(false); + var taskCount = 100; + var concurrency = 5; + + var tasks = trackableCallableStream( + IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { + if (i == 0) { + throw new RuntimeException("Task 0 failed"); + } + Thread.sleep(50); + return i; + }), + streamClosed); + + assertThrows(ExecutionException.class, () -> VirtualThreads.callAll(tasks, concurrency)); + + assertTrue(streamClosed.get()); + } + + private static Stream trackableCallableStream(Stream source, AtomicBoolean closed) { + return source.onClose(() -> closed.set(true)); + } } @Nested @@ -260,5 +286,33 @@ void shouldWrapInputs() throws Exception { assertEquals(10, processed.get()); } + + @Test + void shouldCloseStreamOnTaskFailure() throws Exception { + var streamClosed = new AtomicBoolean(false); + var taskCount = 100; + var concurrency = 5; + + var tasks = trackableRunnableStream( + IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> { + if (i == 0) { + throw new RuntimeException("Task 0 failed"); + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }), + streamClosed); + + assertThrows(ExecutionException.class, () -> VirtualThreads.runAll(tasks, concurrency)); + + assertTrue(streamClosed.get()); + } + + private static Stream trackableRunnableStream(Stream source, AtomicBoolean closed) { + return source.onClose(() -> closed.set(true)); + } } } From 4ae096faf1d4ce8f9c2458487dda7f06736b96ce Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 18:48:14 +0900 Subject: [PATCH 10/17] Wait for termination --- .../commons/concurrent/VirtualThreads.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index f1246a6..83e5668 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -7,6 +7,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -60,6 +61,13 @@ public static List callAll(@NonNull Stream> tasks, int maxCon } } catch (InterruptedException e) { executor.shutdownNow(); + + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException suppressed) { + e.addSuppressed(suppressed); + } + Thread.currentThread().interrupt(); throw e; } @@ -105,6 +113,13 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) } } catch (InterruptedException e) { executor.shutdownNow(); + + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException suppressed) { + e.addSuppressed(suppressed); + } + Thread.currentThread().interrupt(); throw e; } From b68d66530ca5a0180735ad15f0e3e0173e05d973 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 18:49:36 +0900 Subject: [PATCH 11/17] Await termination --- .../commons/concurrent/VirtualThreads.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index 83e5668..14af607 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -57,6 +57,14 @@ public static List callAll(@NonNull Stream> tasks, int maxCon } } catch (ExecutionException e) { executor.shutdownNow(); + + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + e.addSuppressed(interrupted); + } + throw e; } } catch (InterruptedException e) { @@ -109,6 +117,14 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) finished.get(); } catch (ExecutionException e) { executor.shutdownNow(); + + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); + e.addSuppressed(interrupted); + } + throw e; } } catch (InterruptedException e) { From f9569c396a8b369a2295f9443dce87ed60972f73 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 18:57:14 +0900 Subject: [PATCH 12/17] Iterable tasks --- .../commons/concurrent/VirtualThreads.java | 50 ++++++++++++++++ .../concurrent/VirtualThreadsTest.java | 57 +++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index 14af607..53b996c 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -11,9 +11,23 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import lombok.NonNull; public class VirtualThreads { + /** + * Executes tasks on virtual threads with bounded concurrency. + * Returns results in submission order. + * Fail-fast: first task failure cancels remaining tasks and propagates. + * This method is blocking. + */ + @SuppressWarnings("unchecked") + public static List callAll(@NonNull Iterable> tasks, int maxConcurrency) + throws InterruptedException, ExecutionException { + return callAll( + (Stream>) (Stream) StreamSupport.stream(tasks.spliterator(), false), maxConcurrency); + } + /** * Executes tasks on virtual threads with bounded concurrency. * Returns results in submission order. @@ -84,6 +98,17 @@ public static List callAll(@NonNull Stream> tasks, int maxCon } } + /** + * Executes tasks on virtual threads with bounded concurrency. + * Fail-fast: first task failure cancels remaining tasks and propagates. + * This method is blocking. + */ + @SuppressWarnings("unchecked") + public static void runAll(@NonNull Iterable tasks, int maxConcurrency) + throws InterruptedException, ExecutionException { + runAll((Stream) (Stream) StreamSupport.stream(tasks.spliterator(), false), maxConcurrency); + } + /** * Executes tasks on virtual threads with bounded concurrency. * Fail-fast: first task failure cancels remaining tasks and propagates. @@ -144,6 +169,19 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) } } + /** + * Executes an action on each input with bounded concurrency. + * Results are returned in submission order. + * Fail-fast: first action failure cancels remaining actions and propagates. + * This method is blocking. + */ + public static List callAll(@NonNull Iterable inputs, @NonNull Function fn, int maxConcurrency) + throws InterruptedException, ExecutionException { + return callAll( + StreamSupport.stream(inputs.spliterator(), false).map(input -> (Callable) () -> fn.apply(input)), + maxConcurrency); + } + /** * Executes an action on each input with bounded concurrency. * Results are returned in submission order. @@ -155,6 +193,18 @@ public static List callAll(@NonNull Stream inputs, @NonNull Functio return callAll(inputs.map(input -> (Callable) () -> fn.apply(input)), maxConcurrency); } + /** + * Executes an action on each input with bounded concurrency. + * Fail-fast: first action failure cancels remaining actions and propagates. + * This method is blocking. + */ + public static void runAll(@NonNull Iterable inputs, @NonNull Consumer action, int maxConcurrency) + throws InterruptedException, ExecutionException { + runAll( + StreamSupport.stream(inputs.spliterator(), false).map(input -> (Runnable) () -> action.accept(input)), + maxConcurrency); + } + /** * Executes an action on each input with bounded concurrency. * Fail-fast: first action failure cancels remaining actions and propagates. diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index 3091c9a..ea1493a 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -6,6 +6,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -162,6 +164,23 @@ void shouldCloseStreamOnTaskFailure() throws Exception { assertTrue(streamClosed.get()); } + @Test + void shouldHandleIterableOfCallables() throws Exception { + var tasks = new ArrayList>(); + for (int i = 0; i < 10; i++) { + int index = i; + tasks.add(() -> index); + } + + var results = VirtualThreads.callAll(tasks, 5); + + assertNotNull(results); + assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + assertEquals(i, results.get(i)); + } + } + private static Stream trackableCallableStream(Stream source, AtomicBoolean closed) { return source.onClose(() -> closed.set(true)); } @@ -311,8 +330,46 @@ void shouldCloseStreamOnTaskFailure() throws Exception { assertTrue(streamClosed.get()); } + @Test + void shouldHandleIterableOfRunnables() throws Exception { + var tasksRun = new AtomicInteger(); + var tasks = new ArrayList(); + for (int i = 0; i < 10; i++) { + tasks.add(tasksRun::incrementAndGet); + } + + VirtualThreads.runAll(tasks, 5); + + assertEquals(10, tasksRun.get()); + } + private static Stream trackableRunnableStream(Stream source, AtomicBoolean closed) { return source.onClose(() -> closed.set(true)); } } + + @Nested + class IterableOverloadTests { + @Test + void shouldCallAllWithIterableAndFunction() throws Exception { + var inputs = List.of(1, 2, 3, 4, 5); + + var results = VirtualThreads.callAll(inputs, i -> i * 2, 3); + + assertNotNull(results); + assertEquals(5, results.size()); + assertEquals(2, results.get(0)); + assertEquals(10, results.get(4)); + } + + @Test + void shouldRunAllWithIterableAndConsumer() throws Exception { + var processed = new AtomicInteger(); + var inputs = List.of(1, 2, 3, 4, 5); + + VirtualThreads.runAll(inputs, i -> processed.incrementAndGet(), 3); + + assertEquals(5, processed.get()); + } + } } From 3658598318822db7507d208f0eb5adb6e9099911 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 20:23:28 +0900 Subject: [PATCH 13/17] Refactor --- .../concurrent/VirtualThreadsTest.java | 55 ++++++++++--------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index ea1493a..0c1582c 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -130,19 +130,6 @@ private static void testGeneric(int taskCount, int concurrency) } } - @Test - void shouldWrapInputs() throws Exception { - var inputs = IntStream.range(0, 10).boxed(); - - var results = VirtualThreads.callAll(inputs, i -> i * 2, 5); - - assertNotNull(results); - assertEquals(10, results.size()); - for (var i = 0; i < 10; i++) { - assertEquals(i * 2, results.get(i)); - } - } - @Test void shouldCloseStreamOnTaskFailure() throws Exception { var streamClosed = new AtomicBoolean(false); @@ -296,16 +283,6 @@ private static void testGeneric(int taskCount, int concurrency) assertEquals(taskCount, tasksRun.get()); } - @Test - void shouldWrapInputs() throws Exception { - var inputs = IntStream.range(0, 10).boxed(); - var processed = new AtomicInteger(); - - VirtualThreads.runAll(inputs, i -> processed.incrementAndGet(), 5); - - assertEquals(10, processed.get()); - } - @Test void shouldCloseStreamOnTaskFailure() throws Exception { var streamClosed = new AtomicBoolean(false); @@ -349,9 +326,22 @@ private static Stream trackableRunnableStream(Stream source, AtomicBoo } @Nested - class IterableOverloadTests { + class CallAllWithFunctionTests { + @Test + void shouldTransformInputsUsingStream() throws Exception { + var inputs = IntStream.range(0, 10).boxed(); + + var results = VirtualThreads.callAll(inputs, i -> i * 2, 5); + + assertNotNull(results); + assertEquals(10, results.size()); + for (var i = 0; i < 10; i++) { + assertEquals(i * 2, results.get(i)); + } + } + @Test - void shouldCallAllWithIterableAndFunction() throws Exception { + void shouldTransformInputsUsingIterable() throws Exception { var inputs = List.of(1, 2, 3, 4, 5); var results = VirtualThreads.callAll(inputs, i -> i * 2, 3); @@ -361,9 +351,22 @@ void shouldCallAllWithIterableAndFunction() throws Exception { assertEquals(2, results.get(0)); assertEquals(10, results.get(4)); } + } + + @Nested + class RunAllWithConsumerTests { + @Test + void shouldProcessInputsUsingStream() throws Exception { + var inputs = IntStream.range(0, 10).boxed(); + var processed = new AtomicInteger(); + + VirtualThreads.runAll(inputs, i -> processed.incrementAndGet(), 5); + + assertEquals(10, processed.get()); + } @Test - void shouldRunAllWithIterableAndConsumer() throws Exception { + void shouldProcessInputsUsingIterable() throws Exception { var processed = new AtomicInteger(); var inputs = List.of(1, 2, 3, 4, 5); From db34ccbd4b1d29a5a6aec6c53eab1fb9c11b43fd Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 22:14:56 +0900 Subject: [PATCH 14/17] Refactor to Iterable as main --- .../commons/concurrent/VirtualThreads.java | 65 ++++++++++--------- .../concurrent/VirtualThreadsTest.java | 54 --------------- 2 files changed, 35 insertions(+), 84 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index 53b996c..48846e6 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -10,8 +10,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import lombok.NonNull; public class VirtualThreads { @@ -21,26 +21,13 @@ public class VirtualThreads { * Fail-fast: first task failure cancels remaining tasks and propagates. * This method is blocking. */ - @SuppressWarnings("unchecked") public static List callAll(@NonNull Iterable> tasks, int maxConcurrency) throws InterruptedException, ExecutionException { - return callAll( - (Stream>) (Stream) StreamSupport.stream(tasks.spliterator(), false), maxConcurrency); - } - - /** - * Executes tasks on virtual threads with bounded concurrency. - * Returns results in submission order. - * Fail-fast: first task failure cancels remaining tasks and propagates. - * This method is blocking. - */ - public static List callAll(@NonNull Stream> tasks, int maxConcurrency) - throws InterruptedException, ExecutionException { if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be > 0"); } - try (tasks; - var executor = Executors.newVirtualThreadPerTaskExecutor()) { + var executor = Executors.newVirtualThreadPerTaskExecutor(); + try { var iterator = tasks.iterator(); var completion = new ExecutorCompletionService>(executor); var results = new ArrayList(); @@ -95,18 +82,20 @@ public static List callAll(@NonNull Stream> tasks, int maxCon } } return results; + } finally { + executor.shutdown(); } } /** * Executes tasks on virtual threads with bounded concurrency. + * Returns results in submission order. * Fail-fast: first task failure cancels remaining tasks and propagates. * This method is blocking. */ - @SuppressWarnings("unchecked") - public static void runAll(@NonNull Iterable tasks, int maxConcurrency) + public static List callAll(@NonNull Stream> tasks, int maxConcurrency) throws InterruptedException, ExecutionException { - runAll((Stream) (Stream) StreamSupport.stream(tasks.spliterator(), false), maxConcurrency); + return callAll(tasks.collect(Collectors.toList()), maxConcurrency); } /** @@ -114,13 +103,13 @@ public static void runAll(@NonNull Iterable tasks, int maxCo * Fail-fast: first task failure cancels remaining tasks and propagates. * This method is blocking. */ - public static void runAll(@NonNull Stream tasks, int maxConcurrency) + public static void runAll(@NonNull Iterable tasks, int maxConcurrency) throws InterruptedException, ExecutionException { if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be > 0"); } - try (tasks; - var executor = Executors.newVirtualThreadPerTaskExecutor()) { + var executor = Executors.newVirtualThreadPerTaskExecutor(); + try { var iterator = tasks.iterator(); var completion = new ExecutorCompletionService(executor); int inFlight = 0; @@ -166,9 +155,21 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) } inFlight--; } + } finally { + executor.shutdown(); } } + /** + * Executes tasks on virtual threads with bounded concurrency. + * Fail-fast: first task failure cancels remaining tasks and propagates. + * This method is blocking. + */ + public static void runAll(@NonNull Stream tasks, int maxConcurrency) + throws InterruptedException, ExecutionException { + runAll(tasks.collect(Collectors.toList()), maxConcurrency); + } + /** * Executes an action on each input with bounded concurrency. * Results are returned in submission order. @@ -177,9 +178,11 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) */ public static List callAll(@NonNull Iterable inputs, @NonNull Function fn, int maxConcurrency) throws InterruptedException, ExecutionException { - return callAll( - StreamSupport.stream(inputs.spliterator(), false).map(input -> (Callable) () -> fn.apply(input)), - maxConcurrency); + var tasks = new ArrayList>(); + for (var input : inputs) { + tasks.add(() -> fn.apply(input)); + } + return callAll(tasks, maxConcurrency); } /** @@ -190,7 +193,7 @@ public static List callAll(@NonNull Iterable inputs, @NonNull Funct */ public static List callAll(@NonNull Stream inputs, @NonNull Function fn, int maxConcurrency) throws InterruptedException, ExecutionException { - return callAll(inputs.map(input -> (Callable) () -> fn.apply(input)), maxConcurrency); + return callAll(inputs.collect(Collectors.toList()), fn, maxConcurrency); } /** @@ -200,9 +203,11 @@ public static List callAll(@NonNull Stream inputs, @NonNull Functio */ public static void runAll(@NonNull Iterable inputs, @NonNull Consumer action, int maxConcurrency) throws InterruptedException, ExecutionException { - runAll( - StreamSupport.stream(inputs.spliterator(), false).map(input -> (Runnable) () -> action.accept(input)), - maxConcurrency); + var tasks = new ArrayList(); + for (var input : inputs) { + tasks.add(() -> action.accept(input)); + } + runAll(tasks, maxConcurrency); } /** @@ -212,7 +217,7 @@ public static void runAll(@NonNull Iterable inputs, @NonNull Consumer */ public static void runAll(@NonNull Stream inputs, @NonNull Consumer action, int maxConcurrency) throws InterruptedException, ExecutionException { - runAll(inputs.map(input -> (Runnable) () -> action.accept(input)), maxConcurrency); + runAll(inputs.collect(Collectors.toList()), action, maxConcurrency); } private static class Result { diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index 0c1582c..7deb72b 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -10,7 +10,6 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -130,27 +129,6 @@ private static void testGeneric(int taskCount, int concurrency) } } - @Test - void shouldCloseStreamOnTaskFailure() throws Exception { - var streamClosed = new AtomicBoolean(false); - var taskCount = 100; - var concurrency = 5; - - var tasks = trackableCallableStream( - IntStream.range(0, taskCount).mapToObj(i -> (Callable) () -> { - if (i == 0) { - throw new RuntimeException("Task 0 failed"); - } - Thread.sleep(50); - return i; - }), - streamClosed); - - assertThrows(ExecutionException.class, () -> VirtualThreads.callAll(tasks, concurrency)); - - assertTrue(streamClosed.get()); - } - @Test void shouldHandleIterableOfCallables() throws Exception { var tasks = new ArrayList>(); @@ -167,10 +145,6 @@ void shouldHandleIterableOfCallables() throws Exception { assertEquals(i, results.get(i)); } } - - private static Stream trackableCallableStream(Stream source, AtomicBoolean closed) { - return source.onClose(() -> closed.set(true)); - } } @Nested @@ -283,30 +257,6 @@ private static void testGeneric(int taskCount, int concurrency) assertEquals(taskCount, tasksRun.get()); } - @Test - void shouldCloseStreamOnTaskFailure() throws Exception { - var streamClosed = new AtomicBoolean(false); - var taskCount = 100; - var concurrency = 5; - - var tasks = trackableRunnableStream( - IntStream.range(0, taskCount).mapToObj(i -> (Runnable) () -> { - if (i == 0) { - throw new RuntimeException("Task 0 failed"); - } - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }), - streamClosed); - - assertThrows(ExecutionException.class, () -> VirtualThreads.runAll(tasks, concurrency)); - - assertTrue(streamClosed.get()); - } - @Test void shouldHandleIterableOfRunnables() throws Exception { var tasksRun = new AtomicInteger(); @@ -319,10 +269,6 @@ void shouldHandleIterableOfRunnables() throws Exception { assertEquals(10, tasksRun.get()); } - - private static Stream trackableRunnableStream(Stream source, AtomicBoolean closed) { - return source.onClose(() -> closed.set(true)); - } } @Nested From 61fade0ceedeed2669778228c169b70a9193d632 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 22:49:40 +0900 Subject: [PATCH 15/17] Iterator main function --- .../commons/concurrent/VirtualThreads.java | 84 ++++++++++++++----- .../concurrent/VirtualThreadsTest.java | 37 ++++++++ 2 files changed, 100 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index 48846e6..81b17db 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -10,7 +11,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.NonNull; @@ -21,23 +21,22 @@ public class VirtualThreads { * Fail-fast: first task failure cancels remaining tasks and propagates. * This method is blocking. */ - public static List callAll(@NonNull Iterable> tasks, int maxConcurrency) + public static List callAll(@NonNull Iterator> tasks, int maxConcurrency) throws InterruptedException, ExecutionException { if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be > 0"); } var executor = Executors.newVirtualThreadPerTaskExecutor(); try { - var iterator = tasks.iterator(); var completion = new ExecutorCompletionService>(executor); var results = new ArrayList(); int nextIndex = 0; int inFlight = 0; var completed = new HashMap(); - while (iterator.hasNext() || inFlight > 0) { - while (inFlight < maxConcurrency && iterator.hasNext()) { + while (tasks.hasNext() || inFlight > 0) { + while (inFlight < maxConcurrency && tasks.hasNext()) { int index = nextIndex++; - var task = iterator.next(); + var task = tasks.next(); completion.submit(() -> { T value = task.call(); return new Result<>(index, value); @@ -87,6 +86,17 @@ public static List callAll(@NonNull Iterable> tasks } } + /** + * Executes tasks on virtual threads with bounded concurrency. + * Returns results in submission order. + * Fail-fast: first task failure cancels remaining tasks and propagates. + * This method is blocking. + */ + public static List callAll(@NonNull Iterable> tasks, int maxConcurrency) + throws InterruptedException, ExecutionException { + return callAll(tasks.iterator(), maxConcurrency); + } + /** * Executes tasks on virtual threads with bounded concurrency. * Returns results in submission order. @@ -95,7 +105,7 @@ public static List callAll(@NonNull Iterable> tasks */ public static List callAll(@NonNull Stream> tasks, int maxConcurrency) throws InterruptedException, ExecutionException { - return callAll(tasks.collect(Collectors.toList()), maxConcurrency); + return callAll(tasks.iterator(), maxConcurrency); } /** @@ -103,19 +113,18 @@ public static List callAll(@NonNull Stream> tasks, int maxCon * Fail-fast: first task failure cancels remaining tasks and propagates. * This method is blocking. */ - public static void runAll(@NonNull Iterable tasks, int maxConcurrency) + public static void runAll(@NonNull Iterator tasks, int maxConcurrency) throws InterruptedException, ExecutionException { if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be > 0"); } var executor = Executors.newVirtualThreadPerTaskExecutor(); try { - var iterator = tasks.iterator(); var completion = new ExecutorCompletionService(executor); int inFlight = 0; - while (iterator.hasNext() || inFlight > 0) { - while (inFlight < maxConcurrency && iterator.hasNext()) { - var task = iterator.next(); + while (tasks.hasNext() || inFlight > 0) { + while (inFlight < maxConcurrency && tasks.hasNext()) { + var task = tasks.next(); completion.submit(() -> { task.run(); return null; @@ -160,6 +169,16 @@ public static void runAll(@NonNull Iterable tasks, int maxCo } } + /** + * Executes tasks on virtual threads with bounded concurrency. + * Fail-fast: first task failure cancels remaining tasks and propagates. + * This method is blocking. + */ + public static void runAll(@NonNull Iterable tasks, int maxConcurrency) + throws InterruptedException, ExecutionException { + runAll(tasks.iterator(), maxConcurrency); + } + /** * Executes tasks on virtual threads with bounded concurrency. * Fail-fast: first task failure cancels remaining tasks and propagates. @@ -167,7 +186,7 @@ public static void runAll(@NonNull Iterable tasks, int maxCo */ public static void runAll(@NonNull Stream tasks, int maxConcurrency) throws InterruptedException, ExecutionException { - runAll(tasks.collect(Collectors.toList()), maxConcurrency); + runAll(tasks.iterator(), maxConcurrency); } /** @@ -176,13 +195,25 @@ public static void runAll(@NonNull Stream tasks, int maxConcurrency) * Fail-fast: first action failure cancels remaining actions and propagates. * This method is blocking. */ - public static List callAll(@NonNull Iterable inputs, @NonNull Function fn, int maxConcurrency) + public static List callAll(@NonNull Iterator inputs, @NonNull Function fn, int maxConcurrency) throws InterruptedException, ExecutionException { var tasks = new ArrayList>(); - for (var input : inputs) { + while (inputs.hasNext()) { + var input = inputs.next(); tasks.add(() -> fn.apply(input)); } - return callAll(tasks, maxConcurrency); + return callAll(tasks.iterator(), maxConcurrency); + } + + /** + * Executes an action on each input with bounded concurrency. + * Results are returned in submission order. + * Fail-fast: first action failure cancels remaining actions and propagates. + * This method is blocking. + */ + public static List callAll(@NonNull Iterable inputs, @NonNull Function fn, int maxConcurrency) + throws InterruptedException, ExecutionException { + return callAll(inputs.iterator(), fn, maxConcurrency); } /** @@ -193,7 +224,7 @@ public static List callAll(@NonNull Iterable inputs, @NonNull Funct */ public static List callAll(@NonNull Stream inputs, @NonNull Function fn, int maxConcurrency) throws InterruptedException, ExecutionException { - return callAll(inputs.collect(Collectors.toList()), fn, maxConcurrency); + return callAll(inputs.map(input -> (Callable) () -> fn.apply(input)).iterator(), maxConcurrency); } /** @@ -201,13 +232,24 @@ public static List callAll(@NonNull Stream inputs, @NonNull Functio * Fail-fast: first action failure cancels remaining actions and propagates. * This method is blocking. */ - public static void runAll(@NonNull Iterable inputs, @NonNull Consumer action, int maxConcurrency) + public static void runAll(@NonNull Iterator inputs, @NonNull Consumer action, int maxConcurrency) throws InterruptedException, ExecutionException { var tasks = new ArrayList(); - for (var input : inputs) { + while (inputs.hasNext()) { + var input = inputs.next(); tasks.add(() -> action.accept(input)); } - runAll(tasks, maxConcurrency); + runAll(tasks.iterator(), maxConcurrency); + } + + /** + * Executes an action on each input with bounded concurrency. + * Fail-fast: first action failure cancels remaining actions and propagates. + * This method is blocking. + */ + public static void runAll(@NonNull Iterable inputs, @NonNull Consumer action, int maxConcurrency) + throws InterruptedException, ExecutionException { + runAll(inputs.iterator(), action, maxConcurrency); } /** @@ -217,7 +259,7 @@ public static void runAll(@NonNull Iterable inputs, @NonNull Consumer */ public static void runAll(@NonNull Stream inputs, @NonNull Consumer action, int maxConcurrency) throws InterruptedException, ExecutionException { - runAll(inputs.collect(Collectors.toList()), action, maxConcurrency); + runAll(inputs.map(input -> (Runnable) () -> action.accept(input)).iterator(), maxConcurrency); } private static class Result { diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index 7deb72b..63bd6f6 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -19,6 +19,16 @@ class VirtualThreadsTest { @Nested class CallAllTests { + @Test + void shouldHandleSingleTaskUsingIterator() throws Exception { + var results = VirtualThreads.callAll( + List.of((Callable) () -> "single-result").iterator(), 1); + + assertNotNull(results); + assertEquals(1, results.size()); + assertEquals("single-result", results.get(0)); + } + @Test void shouldHandleSingleTask() throws Exception { var results = VirtualThreads.callAll(Stream.of((Callable) () -> "single-result"), 1); @@ -149,6 +159,11 @@ void shouldHandleIterableOfCallables() throws Exception { @Nested class RunAllTests { + @Test + void shouldHandleSingleTaskUsingIterator() throws Exception { + VirtualThreads.runAll(List.of((Runnable) () -> {}).iterator(), 1); + } + @Test void shouldHandleSingleTask() throws Exception { VirtualThreads.runAll(Stream.of((Runnable) () -> {}), 1); @@ -273,6 +288,18 @@ void shouldHandleIterableOfRunnables() throws Exception { @Nested class CallAllWithFunctionTests { + @Test + void shouldTransformInputsUsingIterator() throws Exception { + var inputs = List.of(1, 2, 3, 4, 5); + + var results = VirtualThreads.callAll(inputs.iterator(), i -> i * 2, 3); + + assertNotNull(results); + assertEquals(5, results.size()); + assertEquals(2, results.get(0)); + assertEquals(10, results.get(4)); + } + @Test void shouldTransformInputsUsingStream() throws Exception { var inputs = IntStream.range(0, 10).boxed(); @@ -301,6 +328,16 @@ void shouldTransformInputsUsingIterable() throws Exception { @Nested class RunAllWithConsumerTests { + @Test + void shouldProcessInputsUsingIterator() throws Exception { + var processed = new AtomicInteger(); + var inputs = List.of(1, 2, 3, 4, 5); + + VirtualThreads.runAll(inputs.iterator(), i -> processed.incrementAndGet(), 3); + + assertEquals(5, processed.get()); + } + @Test void shouldProcessInputsUsingStream() throws Exception { var inputs = IntStream.range(0, 10).boxed(); From d7006cd8f65e9748d02a611f05e0783a9a768cc0 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 22:51:49 +0900 Subject: [PATCH 16/17] Cleanup --- .../autonomouslogic/commons/concurrent/VirtualThreads.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java index 81b17db..2ad60ac 100644 --- a/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java +++ b/src/main/java/com/autonomouslogic/commons/concurrent/VirtualThreads.java @@ -262,9 +262,9 @@ public static void runAll(@NonNull Stream inputs, @NonNull Consumer ac runAll(inputs.map(input -> (Runnable) () -> action.accept(input)).iterator(), maxConcurrency); } - private static class Result { - final int index; - final T value; + private static final class Result { + private final int index; + private final T value; Result(int index, T value) { this.index = index; From 7562c8cd0550f452721ed2d0c3c1cd49f05223b9 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Sat, 20 Jun 2026 23:40:50 +0900 Subject: [PATCH 17/17] Tests for interrupts --- .../concurrent/VirtualThreadsTest.java | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java index 63bd6f6..3da6ac9 100644 --- a/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java +++ b/src/test/java/com/autonomouslogic/commons/concurrent/VirtualThreadsTest.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; @@ -286,6 +287,97 @@ void shouldHandleIterableOfRunnables() throws Exception { } } + @Nested + class InterruptTests { + @Test + void callAllShouldPropagateInterruptAndResetFlag() throws Exception { + var maxConcurrency = 2; + var started = new CountDownLatch(maxConcurrency); + var blocker = new CountDownLatch(1); + var callingThread = Thread.currentThread(); + + var tasks = List.>of( + () -> { + started.countDown(); + blocker.await(); + return 1; + }, + () -> { + started.countDown(); + blocker.await(); + return 2; + }); + + var interrupter = Thread.ofVirtual().start(() -> { + try { + started.await(); + callingThread.interrupt(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + try { + var ex = assertThrows(InterruptedException.class, () -> VirtualThreads.callAll(tasks, maxConcurrency)); + assertNotNull(ex); + assertTrue( + Thread.currentThread().isInterrupted(), + "Interrupt flag must be re-set after InterruptedException from callAll"); + } finally { + blocker.countDown(); + Thread.interrupted(); // clear flag so JUnit teardown is unaffected + interrupter.join(2000); + } + } + + @Test + void runAllShouldPropagateInterruptAndResetFlag() throws Exception { + var maxConcurrency = 2; + var started = new CountDownLatch(maxConcurrency); + var blocker = new CountDownLatch(1); + var callingThread = Thread.currentThread(); + + var tasks = List.of( + () -> { + try { + started.countDown(); + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, + () -> { + try { + started.countDown(); + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + var interrupter = Thread.ofVirtual().start(() -> { + try { + started.await(); + callingThread.interrupt(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + try { + var ex = assertThrows(InterruptedException.class, () -> VirtualThreads.runAll(tasks, maxConcurrency)); + assertNotNull(ex); + assertTrue( + Thread.currentThread().isInterrupted(), + "Interrupt flag must be re-set after InterruptedException from runAll"); + } finally { + blocker.countDown(); + Thread.interrupted(); // clear flag so JUnit teardown is unaffected + interrupter.join(2000); + } + } + } + @Nested class CallAllWithFunctionTests { @Test