From 2c8137fc1211e624019b1bb4a01795bc1a7006b3 Mon Sep 17 00:00:00 2001 From: Corey Woodfield Date: Tue, 10 Jun 2025 17:08:52 -0600 Subject: [PATCH] Let InputStream.read block instead of blocking manually with Thread.sleep Worker threads that are waiting for a work response get interrupted if they are still going after the execution has completed (See RedisShardBackplane.pollExecution and ShardWorkerContext.resumePoller and Executor.runInterruptible). This is happening to us often. Assuming the problem is in buildfarm, and not our rules, it may be that the execution writes its entire response and completes within the span of the Thread.sleep(10) call, and then the poller sees that the execution is complete and interrupts the thread before it gets the chance to check the output stream again. This seems a bit far-fetched, but it's the only explanation I came up with. By letting the read block, instead of manually blocking until the read won't block, we should react to the work response more promptly, and potentially finish before we get interrupted. --- .../bazel/processes/ProtoWorkerRW.java | 39 +------------------ 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/persistentworkers/src/main/java/persistent/bazel/processes/ProtoWorkerRW.java b/persistentworkers/src/main/java/persistent/bazel/processes/ProtoWorkerRW.java index 0663156626..14d668503e 100644 --- a/persistentworkers/src/main/java/persistent/bazel/processes/ProtoWorkerRW.java +++ b/persistentworkers/src/main/java/persistent/bazel/processes/ProtoWorkerRW.java @@ -42,24 +42,7 @@ public void write(WorkRequest req) throws IOException { writeTo(req, this.writeStream); } - public WorkResponse waitAndRead() throws IOException, InterruptedException { - try { - waitForInput(processWrapper::isAlive, readStream); - } catch (IOException e) { - String stdErrMsg = processWrapper.getErrorString(); - String stdOut = ""; - try { - if (processWrapper.isAlive() && readStream.available() > 0) { - stdOut = IOUtils.toString(readStream, StandardCharsets.UTF_8); - } else { - stdOut = "no stream available"; - } - } catch (IOException e2) { - stdOut = "Exception trying to read stdout: " + e2; - } - throw new IOException( - "IOException on waitForInput; stdErr: " + stdErrMsg + "\nStdout: " + stdOut, e); - } + public WorkResponse waitAndRead() throws IOException { return readResponse(readStream); } @@ -79,24 +62,4 @@ public static WorkResponse readResponse(InputStream inputStream) throws IOExcept public static WorkRequest readRequest(InputStream inputStream) throws IOException { return WorkRequest.parseDelimitedFrom(inputStream); } - - public static void waitForInput(Supplier liveCheck, InputStream inputStream) - throws IOException, InterruptedException { - String workerDeathMsg = "Worker process for died while waiting for response"; - // TODO can we do better than spinning? i.e. condition variable? - while (inputAvailable(inputStream, workerDeathMsg) == 0) { - Thread.sleep(10); - if (!liveCheck.get()) { - throw new IOException(workerDeathMsg + "\n"); - } - } - } - - private static int inputAvailable(InputStream inputStream, String errorMsg) throws IOException { - try { - return inputStream.available(); - } catch (IOException e) { - throw new IOException(errorMsg, e); - } - } }