From 7815dfedd9d6d4f09ecf7c60b9fed8c264c549f6 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Tue, 9 Jun 2026 16:16:13 -0700 Subject: [PATCH] Cancel a stalled standard-input writer when the child process exits. When run() feeds standard input from a source that can stall and the child process exits before consuming that input, run() could hang indefinitely. The input-writing task was parked awaiting its source rather than blocked inside a pipe write, so the existing cancelAsyncIO path (which only unblocks in-flight pipe I/O) could not release it, and the I/O task group never finished draining. Spawn a waitForProcessTermination waiter alongside the input-writing task that cancels the I/O group once the child exits. Also add edge-case coverage for process I/O and AsyncIO, ported from swift-async-process and swift-nio and replace FileDescriptor.close with _safelyClosed. --- Sources/Subprocess/API.swift | 11 ++ Sources/Subprocess/IO/AsyncIO+KQueue.swift | 14 +- Sources/Subprocess/IO/AsyncIO+Linux.swift | 10 +- .../Platforms/Subprocess+Linux.swift | 2 +- .../Platforms/Subprocess+Unix.swift | 8 +- Tests/SubprocessTests/AsyncIOTests.swift | 161 +++++++++++++++++ Tests/SubprocessTests/IntegrationTests.swift | 164 ++++++++++++++++++ Tests/SubprocessTests/UnixTests.swift | 89 ++++++++++ 8 files changed, 442 insertions(+), 17 deletions(-) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 51e20af5..677ddcdb 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -288,6 +288,13 @@ public func run< try await inputWriter.finish() return .inputWritten } + // The input source can stall indefinitely. Wait for termination here and cancel + // the group when the child exits. Output and error capture respond to cancellation + // by draining whatever is still buffered and returning it, so nothing is truncated. + group.addTask { + _ = try? await waitForProcessTermination(for: processIdentifier) + return .processTerminated + } } } @@ -358,6 +365,9 @@ public func run< switch groupResult { case .inputWritten: continue + case .processTerminated: + // The child exited. Cancel any still-running tasks so the drain can complete. + group.cancelAll() case .standardOutputCaptured(let output): capturedOutput = output case .standardErrorCaptured(let error): @@ -395,6 +405,7 @@ private enum _RunGroupResult { case standardOutputCaptured(Output.OutputType) case standardErrorCaptured(Error.OutputType) case inputWritten + case processTerminated } private struct _RunOutcome< diff --git a/Sources/Subprocess/IO/AsyncIO+KQueue.swift b/Sources/Subprocess/IO/AsyncIO+KQueue.swift index 263539fa..5c709a5e 100644 --- a/Sources/Subprocess/IO/AsyncIO+KQueue.swift +++ b/Sources/Subprocess/IO/AsyncIO+KQueue.swift @@ -225,21 +225,21 @@ final class AsyncIO: Sendable { } pthread_join(currentState.monitorThread, nil) - var closeError: Errno? = nil + var closeError: SubprocessError? = nil do { - try kqueueFd.close() + try _safelyClose(.fileDescriptor(kqueueFd)) } catch { - closeError = error as? Errno + closeError = error } do { - try shutdownReadFd.close() + try _safelyClose(.fileDescriptor(shutdownReadFd)) } catch { - closeError = error as? Errno + closeError = error } do { - try shutdownWriteFd.close() + try _safelyClose(.fileDescriptor(shutdownWriteFd)) } catch { - closeError = error as? Errno + closeError = error } if let closeError { diff --git a/Sources/Subprocess/IO/AsyncIO+Linux.swift b/Sources/Subprocess/IO/AsyncIO+Linux.swift index 4a582240..8f1da378 100644 --- a/Sources/Subprocess/IO/AsyncIO+Linux.swift +++ b/Sources/Subprocess/IO/AsyncIO+Linux.swift @@ -210,16 +210,16 @@ final class AsyncIO: Sendable { // Clean up the monitor thread. pthread_join(currentState.monitorThread, nil) - var closeError: Errno? = nil + var closeError: SubprocessError? = nil do { - try epollFd.close() + try _safelyClose(.fileDescriptor(epollFd)) } catch { - closeError = error as? Errno + closeError = error } do { - try shutdownFd.close() + try _safelyClose(.fileDescriptor(shutdownFd)) } catch { - closeError = error as? Errno + closeError = error } if let closeError { diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index a9106a03..9213e259 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -572,7 +572,7 @@ internal func _isWaitprocessDescriptorSupported() -> Bool { // If we can not retrieve pidfd, the system does not support waitid(P_PIDFD) return false } - defer { try? FileDescriptor(rawValue: selfPidfd).close() } + defer { try? _safelyClose(.fileDescriptor(FileDescriptor(rawValue: selfPidfd))) } /// The following call will fail either with /// - ECHILD: in this case we know P_PIDFD is supported and waitid correctly /// reported that we don't have a child with the same selfPidfd; diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index b123a0df..a38556af 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -537,10 +537,10 @@ extension Configuration { // If exec fails we retry with the next candidate path, so // close the pidfd here to avoid leaking it across retries. if processDescriptor != .invalidDescriptor { - do { - try FileDescriptor(rawValue: processDescriptor).close() + do throws(SubprocessError) { + try _safelyClose(.fileDescriptor(FileDescriptor(rawValue: processDescriptor))) } catch { - throw SubprocessError.spawnFailed(withUnderlyingError: error as? SubprocessError.UnderlyingError) + throw SubprocessError.spawnFailed(withUnderlyingError: error.underlyingError) } } // Move on to another possible path @@ -628,7 +628,7 @@ public struct ProcessIdentifier: Sendable, Hashable { internal func close() { if self.processDescriptor != .invalidDescriptor { - try? FileDescriptor(rawValue: self.processDescriptor).close() + try? _safelyClose(.fileDescriptor(FileDescriptor(rawValue: self.processDescriptor))) } } } diff --git a/Tests/SubprocessTests/AsyncIOTests.swift b/Tests/SubprocessTests/AsyncIOTests.swift index c564209e..282be343 100644 --- a/Tests/SubprocessTests/AsyncIOTests.swift +++ b/Tests/SubprocessTests/AsyncIOTests.swift @@ -319,6 +319,167 @@ extension SubprocessAsyncIOTests { } } +// MARK: - Partial Read/Write, Backpressure & Boundary Tests +extension SubprocessAsyncIOTests { + // Ported intent: swift-nio NonBlockingFileIOTest.testReadingShortWorks. + // AsyncIO returns after a single underlying read, so a request larger than + // what's available yields a SHORT read rather than blocking for the full + // amount. + @Test func testShortRead() async throws { + let payload = randomData(count: 64) + try await runReadWriteTest { readIO, readTestBed in + let chunk = try await readIO.read(from: readTestBed.ioDescriptor, upTo: 64 * 1024) + #expect(chunk == payload) + } writer: { writeIO, writeTestBed in + _ = try await writeIO.write(payload, to: writeTestBed.ioDescriptor) + try await writeTestBed.finish() + } + } + + // Ported intent: swift-nio ChannelTests.testPendingWritesWorkWithPartialWrites. + // A write much larger than the pipe buffer must fill the pipe, block on + // EAGAIN, wait for the reader to drain, and resume looping until every + // byte is written. The reader drains in small, delayed chunks to keep the + // pipe full and force many wait cycles. Asserts the write reports the full + // count (proving the partial-write loop) and the bytes survive intact. + func testWriteCompletesUnderBackpressure() async throws { + let payload = randomData(count: 512 * 1024) + try await runReadWriteTest { readIO, readTestBed in + var received: [UInt8] = [] + while let chunk = try await readIO.read(from: readTestBed.ioDescriptor, upTo: 16 * 1024) { + received.append(contentsOf: chunk) + try await readTestBed.delay(.milliseconds(1)) + } + #expect(received == payload) + } writer: { writeIO, writeTestBed in + let written = try await writeIO.write(payload, to: writeTestBed.ioDescriptor) + #expect(written == payload.count) + try await writeTestBed.finish() + } + } + + // Ported intent: swift-nio NonBlockingFileIOTest.testReadManyChunks + // (2000 × 1-byte reads). Reading one byte at a time must deliver every byte + // in order with no loss or misalignment. + @Test func testReadManyTinyChunks() async throws { + let payload = randomData(count: 2048) + try await runReadWriteTest { readIO, readTestBed in + var received: [UInt8] = [] + while let chunk = try await readIO.read(from: readTestBed.ioDescriptor, upTo: 1) { + received.append(contentsOf: chunk) + } + #expect(received == payload) + } writer: { writeIO, writeTestBed in + _ = try await writeIO.write(payload, to: writeTestBed.ioDescriptor) + try await writeTestBed.finish() + } + } + + // Ported intent: swift-nio PipeChannelTest.testWriteEndGoingAway. + // Data written before the write end closes must be fully delivered, and only + // then EOF (`nil`) which exercises AsyncIO's final-drain path. Ordered + // deterministically (write + close, then read) via a manually-wired pipe. + @Test func testReadDrainsBufferedDataThenEOF() async throws { + var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input) + var writeChannel = try _require(pipe.writeFileDescriptor()) + var readChannel = try _require(pipe.readFileDescriptor()) + defer { + try? readChannel.safelyClose() + } + + let payload = randomData(count: 4096) + // Write everything, then close the write end so the reader observes + // buffered data followed by EOF (not EOF first). + _ = try await AsyncIO.shared.write(payload, to: writeChannel) + try writeChannel.safelyClose() + + var received: [UInt8] = [] + while let chunk = try await AsyncIO.shared.read(from: readChannel, upTo: 64 * 1024) { + received.append(contentsOf: chunk) + } + #expect(received == payload) + } + + // Reading with `upTo: .max` exercises the branch where AsyncIO sizes its + // buffer from `queryPipeBufferSize` instead of the requested length. It must + // return a (short) chunk rather than over-allocating or hanging. + @Test func testReadUpToMax() async throws { + let payload = randomData(count: 100) + try await runReadWriteTest { readIO, readTestBed in + var received: [UInt8] = [] + while let chunk = try await readIO.read(from: readTestBed.ioDescriptor, upTo: .max) { + received.append(contentsOf: chunk) + } + #expect(received == payload) + } writer: { writeIO, writeTestBed in + _ = try await writeIO.write(payload, to: writeTestBed.ioDescriptor) + try await writeTestBed.finish() + } + } + + @Test func testQueryPipeBufferSizeIsPositive() async throws { + var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input) + var writeChannel = try _require(pipe.writeFileDescriptor()) + var readChannel = try _require(pipe.readFileDescriptor()) + defer { + try? writeChannel.safelyClose() + try? readChannel.safelyClose() + } + #expect(AsyncIO.queryPipeBufferSize(for: readChannel.descriptor()) > 0) + #expect(AsyncIO.queryPipeBufferSize(for: writeChannel.descriptor()) > 0) + } +} + +// MARK: - Concurrency & Write-Error Tests +extension SubprocessAsyncIOTests { + // Ported intent: swift-nio concurrent read/write tests. Drives many + // independent pipe round-trips through the shared `AsyncIO` at once to + // stress its registration/signal machinery for races. + func testConcurrentReadWriteAcrossManyPipes() async throws { + let pipeCount = 32 + try await withThrowingTaskGroup(of: Void.self) { group in + for _ in 0.. { _ in + // Never yield, never finish. + } + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: ["/c", "exit 0"] + ) + #else + let setup = TestSetup( + executable: .path("/bin/sh"), + arguments: ["-c", "exit 0"] + ) + #endif + let result = try await _run( + setup, + input: .sequence(neverEndingInput), + output: .discarded, + error: .discarded + ) + #expect(result.terminationStatus == .exited(0)) + } + #endif // SubprocessFoundation +} + // MARK: - Utilities extension String { func trimmingNewLineAndQuotes() -> String { diff --git a/Tests/SubprocessTests/UnixTests.swift b/Tests/SubprocessTests/UnixTests.swift index 412e94fa..a6032aa7 100644 --- a/Tests/SubprocessTests/UnixTests.swift +++ b/Tests/SubprocessTests/UnixTests.swift @@ -999,4 +999,93 @@ extension SubprocessUnixTests { #endif } +// MARK: - Standard Input Inheritance +extension SubprocessUnixTests { + @Test func testInheritStandardInput() async throws { + // Exercises the public `InputProtocol.standardInput`: the child inherits + // the parent's own standard input (fd 0). We temporarily point fd 0 at a + // pipe we control, feed it a line, and confirm the child reads it back. + // + // Not ported to Windows: the equivalent would require swapping the test + // host's own `STD_INPUT_HANDLE` / CRT fd 0 — global console state that + // can't be changed safely or verified from here. + let savedStdin = dup(STDIN_FILENO) + try #require(savedStdin >= 0, "dup(STDIN_FILENO) failed: \(errno)") + defer { + _ = dup2(savedStdin, STDIN_FILENO) + _ = close(savedStdin) + } + + let pipe = try FileDescriptor.pipe() + // Point our own stdin at the pipe's read end so the child inherits it. + try #require( + dup2(pipe.readEnd.rawValue, STDIN_FILENO) >= 0, + "dup2 onto STDIN_FILENO failed: \(errno)" + ) + + // Send one line and close the write end so the child sees EOF. + try pipe.writeEnd.writeLine("hello from parent stdin") + try pipe.writeEnd.close() + + let result = try await Subprocess.run( + .path("/bin/cat"), + arguments: [], + input: .standardInput, + output: .string(limit: 256), + error: .discarded + ) + try pipe.readEnd.close() + + #expect(result.terminationStatus.isSuccess) + #expect(result.standardOutput?.trimmingNewLineAndQuotes() == "hello from parent stdin") + } +} + +// MARK: - Pseudo-Terminal Input +extension SubprocessUnixTests { + @Test func testInheritStdinFromPseudoTerminal() async throws { + // Hand the child a pseudo-terminal replica as its standard input, + // proving an arbitrary inherited file descriptor (not a regular file or + // ordinary pipe) works as input. + // + // Not ported to Windows: there is no `openpty`/`termios` equivalent; the + // Windows pseudo-console (ConPTY) is an unrelated API. + var primaryFD: CInt = -1 + var replicaFD: CInt = -1 + try #require( + openpty(&primaryFD, &replicaFD, nil, nil, nil) == 0, + "openpty failed: \(errno)" + ) + let primary = FileDescriptor(rawValue: primaryFD) + let replica = FileDescriptor(rawValue: replicaFD) + + // Raw mode so bytes pass through verbatim (no echo or line editing). + var settings = termios() + try #require(tcgetattr(replicaFD, &settings) == 0, "tcgetattr failed: \(errno)") + cfmakeraw(&settings) + try #require(tcsetattr(replicaFD, TCSANOW, &settings) == 0, "tcsetattr failed: \(errno)") + + let payload = "pty stdin works" + // Pre-fill the pty buffer; the child reads exactly this many bytes and + // then exits, closing its inherited replica. + try Array(payload.utf8).withUnsafeBytes { buffer in + _ = try primary.write(buffer) + } + + let result = try await Subprocess.run( + .name("head"), + arguments: ["-c", "\(payload.utf8.count)"], + // The replica is owned by Subprocess (closed after spawn); we keep + // and close the primary ourselves. + input: .fileDescriptor(replica, closeAfterSpawningProcess: true), + output: .string(limit: 256), + error: .discarded + ) + try primary.close() + + #expect(result.terminationStatus.isSuccess) + #expect(result.standardOutput?.trimmingNewLineAndQuotes() == payload) + } +} + #endif // !os(Windows)