diff --git a/astra-core/src/main/java/org/alfasoftware/astra/core/utils/AstraCore.java b/astra-core/src/main/java/org/alfasoftware/astra/core/utils/AstraCore.java index aa6f193..ea74065 100644 --- a/astra-core/src/main/java/org/alfasoftware/astra/core/utils/AstraCore.java +++ b/astra-core/src/main/java/org/alfasoftware/astra/core/utils/AstraCore.java @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.alfasoftware.astra.core.refactoring.UseCase; @@ -82,26 +81,24 @@ protected void runOperations(String directoryPath, UseCase useCase, String[] sou log.info("Starting Astra run for directory: " + directoryPath); AtomicLong currentFileIndex = new AtomicLong(); AtomicLong currentPercentage = new AtomicLong(); - log.info("Counting files (this may take a few seconds)"); Instant startTime = Instant.now(); - List javaFilesInDirectory; - try (Stream walk = Files.walk(Paths.get(directoryPath))) { - javaFilesInDirectory = walk - .filter(f -> f.toFile().isFile()) - .filter(f -> f.getFileName().toString().endsWith("java")) - .collect(Collectors.toList()); - } - log.info(javaFilesInDirectory.size() + " .java files in directory to review"); + // The same file-selection filter (.java extension + the UseCase path prefiltering predicate) + // is used for both the count walk and the processing walk, so the progress denominator can + // never drift from the set of files actually processed. + Path sourcePath = Paths.get(directoryPath); + Predicate fileFilter = buildFileFilter(useCase); - log.info("Applying prefilters to files in directory"); - Predicate prefilteringPredicate = useCase.getPrefilteringPredicate(); - List filteredJavaFiles = javaFilesInDirectory.stream() - .filter(f -> prefilteringPredicate.test(f.toString())) - .collect(Collectors.toList()); - log.info(filteredJavaFiles.size() + " files remain after prefiltering"); + // First walk: a cheap count pass that reads only directory metadata (not file contents) to + // determine the total number of files to process, which feeds the progress percentage below. + log.info("Counting files (this may take a few seconds)"); + long totalFiles; + try (Stream walk = Files.walk(sourcePath)) { + totalFiles = walk.filter(fileFilter).count(); + } + log.info(totalFiles + " files to process after prefiltering"); - if (filteredJavaFiles.isEmpty()) { + if (totalFiles == 0) { log.info(getPrintableDuration(Duration.between(startTime, Instant.now()))); return; } @@ -109,14 +106,19 @@ protected void runOperations(String directoryPath, UseCase useCase, String[] sou Set operations = useCase.getOperations(); int parallelism = useCase.getParallelism(); Predicate contentPrefilteringPredicate = useCase.getContentPrefilteringPredicate(); - log.info("Processing [" + filteredJavaFiles.size() + "] files with [" + parallelism + "] thread(s)"); + log.info("Processing [" + totalFiles + "] files with [" + parallelism + "] thread(s)"); List fileErrors = new ArrayList<>(); ExecutorService executor = Executors.newFixedThreadPool(parallelism); try { - List> futures = filteredJavaFiles.stream() - .map(f -> executor.submit(() -> applyOperationsAndSave(f, operations, sources, classPath, contentPrefilteringPredicate))) - .collect(Collectors.toList()); + // Second walk: stream paths directly into the executor as they are encountered, without + // materialising every path into a List first. This avoids holding all paths in memory at + // once on very large source trees. + List> futures = new ArrayList<>(); + try (Stream walk = Files.walk(sourcePath)) { + walk.filter(fileFilter) + .forEach(f -> futures.add(executor.submit(() -> applyOperationsAndSave(f, operations, sources, classPath, contentPrefilteringPredicate)))); + } for (Future future : futures) { try { @@ -130,9 +132,9 @@ protected void runOperations(String directoryPath, UseCase useCase, String[] sou throw new IOException("File processing was interrupted", e); } long idx = currentFileIndex.incrementAndGet(); - long newPct = idx * 100 / filteredJavaFiles.size(); + long newPct = idx * 100 / totalFiles; if (currentPercentage.getAndSet(newPct) != newPct) { - logProgress(idx, newPct, startTime, filteredJavaFiles.size()); + logProgress(idx, newPct, startTime, totalFiles); } } } finally { @@ -153,6 +155,22 @@ protected void runOperations(String directoryPath, UseCase useCase, String[] sou } + /** + * Builds the single, shared file-selection filter applied to every {@link Path} encountered when + * walking the source directory. This combines the {@code .java} file check with the path-level + * prefiltering predicate from the {@link UseCase} ({@link UseCase#getPrefilteringPredicate()}). + * + *

The same {@link Predicate} instance is used for both the count walk (which determines the + * total used for progress reporting) and the processing walk, ensuring the two can never diverge. + */ + private Predicate buildFileFilter(UseCase useCase) { + Predicate prefilteringPredicate = useCase.getPrefilteringPredicate(); + return f -> f.toFile().isFile() + && f.getFileName().toString().endsWith("java") + && prefilteringPredicate.test(f.toString()); + } + + private void logProgress(long currentFileIndex, long currentPercentage, Instant startTime, long totalNumberOfFiles) { Duration elapsedDuration = Duration.between(startTime, Instant.now()); Duration estimatedDuration = elapsedDuration.multipliedBy(totalNumberOfFiles).dividedBy(currentFileIndex); diff --git a/astra-core/src/test/java/org/alfasoftware/astra/core/utils/TestAstraCoreParallelExecution.java b/astra-core/src/test/java/org/alfasoftware/astra/core/utils/TestAstraCoreParallelExecution.java index e823e1f..42a1e7c 100644 --- a/astra-core/src/test/java/org/alfasoftware/astra/core/utils/TestAstraCoreParallelExecution.java +++ b/astra-core/src/test/java/org/alfasoftware/astra/core/utils/TestAstraCoreParallelExecution.java @@ -9,15 +9,25 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; -import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.alfasoftware.astra.core.refactoring.UseCase; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.AppenderBase; public class TestAstraCoreParallelExecution { @@ -35,11 +45,6 @@ public void tearDown() throws IOException { .forEach(path -> path.toFile().delete()); } - @Test - public void testDefaultParallelismMatchesAvailableProcessors() { - UseCase useCase = () -> new HashSet<>(); - assertEquals(Runtime.getRuntime().availableProcessors(), useCase.getParallelism()); - } /** * Verifies that all files in the target directory are visited when running with parallelism > 1. @@ -48,31 +53,11 @@ public void testDefaultParallelismMatchesAvailableProcessors() { @Test public void testParallelExecutionProcessesAllFiles() throws IOException { int fileCount = 4; - for (int i = 1; i <= fileCount; i++) { - Files.writeString(tempDir.resolve("File" + i + ".java"), - "public class File" + i + " { void method() {} }"); - } + writeJavaClasses("File", fileCount); Set processedFiles = ConcurrentHashMap.newKeySet(); - UseCase useCase = new UseCase() { - @Override - public Set getOperations() { - return Set.of((compilationUnit, node, rewriter) -> { - Path path = (Path) compilationUnit.getProperty(CompilationUnitProperty.ABSOLUTE_PATH); - if (path != null) { - processedFiles.add(path); - } - }); - } - - @Override - public int getParallelism() { - return fileCount; - } - }; - - AstraCore.run(tempDir.toString(), useCase); + AstraCore.run(tempDir.toString(), useCase(fileCount, recordVisitedPaths(processedFiles))); assertEquals("All files should have been visited", fileCount, processedFiles.size()); } @@ -84,29 +69,13 @@ public int getParallelism() { @Test public void testParallelAndSequentialVisitSameFiles() throws IOException { int fileCount = 3; - for (int i = 1; i <= fileCount; i++) { - Files.writeString(tempDir.resolve("ClassFile" + i + ".java"), - "public class ClassFile" + i + " {}"); - } + writeJavaClasses("ClassFile", fileCount); for (int parallelism : new int[]{1, fileCount}) { Set visited = ConcurrentHashMap.newKeySet(); - UseCase useCase = new UseCase() { - @Override - public Set getOperations() { - return Set.of((compilationUnit, node, rewriter) -> { - Path path = (Path) compilationUnit.getProperty(CompilationUnitProperty.ABSOLUTE_PATH); - if (path != null) visited.add(path); - }); - } - - @Override - public int getParallelism() { - return parallelism; - } - }; - AstraCore.run(tempDir.toString(), useCase); + AstraCore.run(tempDir.toString(), useCase(parallelism, recordVisitedPaths(visited))); + assertEquals("parallelism=" + parallelism + " should visit all files", fileCount, visited.size()); } } @@ -117,22 +86,11 @@ public int getParallelism() { */ @Test public void testParallelExecutionSurfacesPerFileErrors() throws IOException { - Files.writeString(tempDir.resolve("File1.java"), "public class File1 {}"); - Files.writeString(tempDir.resolve("File2.java"), "public class File2 {}"); + writeJavaClasses("File", 2); - UseCase useCase = new UseCase() { - @Override - public Set getOperations() { - return Set.of((compilationUnit, node, rewriter) -> { - throw new RuntimeException("Intentional test failure"); - }); - } - - @Override - public int getParallelism() { - return 2; - } - }; + UseCase useCase = useCase(2, (compilationUnit, node, rewriter) -> { + throw new RuntimeException("Intentional test failure"); + }); try { AstraCore.run(tempDir.toString(), useCase); @@ -141,7 +99,7 @@ public int getParallelism() { assertNotNull("Exception message should be present", e.getMessage()); // The root cause chain should reach the original failures Throwable root = e; - while (root.getCause() != null) { + while (root.getCause() != null && ! root.getCause().equals(root)) { root = root.getCause(); } assertTrue("Exception chain should reference processing failures", @@ -155,34 +113,21 @@ public int getParallelism() { @Test public void testParallelExecutionContinuesAfterFileError() throws IOException { int totalFiles = 4; - for (int i = 1; i <= totalFiles; i++) { - Files.writeString(tempDir.resolve("File" + i + ".java"), - "public class File" + i + " {}"); - } + writeJavaClasses("File", totalFiles); Set processedFiles = ConcurrentHashMap.newKeySet(); AtomicInteger failCount = new AtomicInteger(); - UseCase useCase = new UseCase() { - @Override - public Set getOperations() { - return Set.of((compilationUnit, node, rewriter) -> { - Path path = (Path) compilationUnit.getProperty(CompilationUnitProperty.ABSOLUTE_PATH); - if (path != null && path.getFileName().toString().equals("File2.java")) { - failCount.incrementAndGet(); - throw new RuntimeException("Intentional failure for File2"); - } - if (path != null) { - processedFiles.add(path); - } - }); + UseCase useCase = useCase(2, (compilationUnit, node, rewriter) -> { + Path path = visitedPath(compilationUnit); + if (path != null && path.getFileName().toString().equals("File2.java")) { + failCount.incrementAndGet(); + throw new RuntimeException("Intentional failure for File2"); } - - @Override - public int getParallelism() { - return 2; + if (path != null) { + processedFiles.add(path); } - }; + }); try { AstraCore.run(tempDir.toString(), useCase); @@ -193,4 +138,155 @@ public int getParallelism() { assertTrue("File2 should have triggered the failure", failCount.get() > 0); } } + + /** + * Verifies that the shared file-selection filter (the {@code .java} extension check plus the + * {@link UseCase} path prefiltering predicate) is honoured by the processing walk: only files + * passing the predicate are processed, and non-matching ones are not. This proves the count walk + * and the processing walk use the same filter, so the progress total cannot drift. + */ + @Test + public void testPrefilteringPredicateLimitsProcessedFiles() throws IOException { + // Three files that should be processed (names starting with "Include")... + writeJavaClasses("Include", 3); + // ...and two that should be filtered out by the prefiltering predicate. + writeJavaClasses("Exclude", 2); + // A non-.java file that must never be processed regardless of the predicate. + Files.writeString(tempDir.resolve("notes.txt"), "not a java file"); + + Set processedFiles = ConcurrentHashMap.newKeySet(); + + UseCase useCase = useCase(2, path -> path.contains("Include"), recordVisitedPaths(processedFiles)); + + AstraCore.run(tempDir.toString(), useCase); + + assertEquals("Only files passing the prefiltering predicate should be processed", + 3, processedFiles.size()); + for (Path processed : processedFiles) { + assertTrue("Processed file should match the predicate: " + processed, + processed.getFileName().toString().contains("Include")); + } + } + + /** + * Verifies that the total used for progress logging equals the number of files passing the shared + * filter. The progress messages report "[X] of [N] files reviewed"; we capture the logs and assert + * the denominator N matches the count of files that pass the prefiltering predicate. + */ + @Test + public void testProgressTotalMatchesFilteredFileCount() throws IOException { + int matching = 5; + writeJavaClasses("Match", matching); + // Files that do not pass the predicate must not be counted in the progress total. + writeJavaClasses("Skip", 3); + + UseCase useCase = useCase(1, path -> path.contains("Match")); + + List events = captureAstraLogs(() -> AstraCore.run(tempDir.toString(), useCase)); + + // The progress messages embed the total as "of [N] files reviewed". + Pattern totalPattern = Pattern.compile("of \\[(\\d+)\\] files reviewed"); + boolean foundProgress = false; + for (String event : events) { + Matcher matcher = totalPattern.matcher(event); + if (matcher.find()) { + foundProgress = true; + assertEquals("Progress total should match the number of files passing the filter", + matching, Integer.parseInt(matcher.group(1))); + } + } + assertTrue("Expected at least one progress log line with a total", foundProgress); + } + + + /** + * Writes {@code count} trivial Java classes named {@code 1.java} .. {@code .java} + * into the temp directory. + */ + private void writeJavaClasses(String prefix, int count) throws IOException { + for (int i = 1; i <= count; i++) { + Files.writeString(tempDir.resolve(prefix + i + ".java"), "public class " + prefix + i + " {}"); + } + } + + + /** + * Builds a {@link UseCase} running the given operations at the given parallelism, accepting all files. + */ + private static UseCase useCase(int parallelism, ASTOperation... operations) { + return useCase(parallelism, path -> true, operations); + } + + + /** + * Builds a {@link UseCase} running the given operations at the given parallelism, filtered by the + * supplied path prefiltering predicate. + */ + private static UseCase useCase(int parallelism, Predicate prefilteringPredicate, ASTOperation... operations) { + return new UseCase() { + @Override + public Set getOperations() { + return Set.of(operations); + } + + @Override + public int getParallelism() { + return parallelism; + } + + @Override + public Predicate getPrefilteringPredicate() { + return prefilteringPredicate; + } + }; + } + + + /** + * An operation that records the absolute path of every compilation unit it visits into {@code sink}. + */ + private static ASTOperation recordVisitedPaths(Set sink) { + return (compilationUnit, node, rewriter) -> { + Path path = visitedPath(compilationUnit); + if (path != null) { + sink.add(path); + } + }; + } + + + /** + * Extracts the absolute path property that AstraCore attaches to each compilation unit it processes. + */ + private static Path visitedPath(org.eclipse.jdt.core.dom.CompilationUnit compilationUnit) { + return (Path) compilationUnit.getProperty(CompilationUnitProperty.ABSOLUTE_PATH); + } + + + /** + * Captures the {@code INFO}-level log events emitted by {@link AstraCore} while {@code action} runs. + */ + private static List captureAstraLogs(Runnable action) throws IOException { + List events = new CopyOnWriteArrayList<>(); + AppenderBase appender = new AppenderBase<>() { + @Override + protected void append(ILoggingEvent event) { + events.add(event.getFormattedMessage()); + } + }; + appender.start(); + + Logger astraLogger = (Logger) LoggerFactory.getLogger(AstraCore.class); + Level previousLevel = astraLogger.getLevel(); + astraLogger.setLevel(Level.INFO); + astraLogger.addAppender(appender); + try { + action.run(); + } finally { + astraLogger.detachAppender(appender); + astraLogger.setLevel(previousLevel); + appender.stop(); + } + return events; + } } diff --git a/astra-core/src/test/java/org/alfasoftware/astra/core/utils/TestContentPrefilteringPredicate.java b/astra-core/src/test/java/org/alfasoftware/astra/core/utils/TestContentPrefilteringPredicate.java index d0705f4..e6c64b7 100644 --- a/astra-core/src/test/java/org/alfasoftware/astra/core/utils/TestContentPrefilteringPredicate.java +++ b/astra-core/src/test/java/org/alfasoftware/astra/core/utils/TestContentPrefilteringPredicate.java @@ -8,7 +8,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate;