Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Sources/Subprocess/API.swift
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ public func run<
try await inputWriter.finish()
return .inputWritten
}
// The input source can stall indefinitely. Wait for termination here and cancel
// the group when the child exits. Output and error capture respond to cancellation
// by draining whatever is still buffered and returning it, so nothing is truncated.
group.addTask {
_ = try? await waitForProcessTermination(for: processIdentifier)
return .processTerminated
}
}
}

Expand Down Expand Up @@ -358,6 +365,9 @@ public func run<
switch groupResult {
case .inputWritten:
continue
case .processTerminated:
// The child exited. Cancel any still-running tasks so the drain can complete.
group.cancelAll()
case .standardOutputCaptured(let output):
capturedOutput = output
case .standardErrorCaptured(let error):
Expand Down Expand Up @@ -395,6 +405,7 @@ private enum _RunGroupResult<Output: OutputProtocol, Error: OutputProtocol> {
case standardOutputCaptured(Output.OutputType)
case standardErrorCaptured(Error.OutputType)
case inputWritten
case processTerminated
}

private struct _RunOutcome<
Expand Down
14 changes: 7 additions & 7 deletions Sources/Subprocess/IO/AsyncIO+KQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -225,21 +225,21 @@ final class AsyncIO: Sendable {
}

pthread_join(currentState.monitorThread, nil)
var closeError: Errno? = nil
var closeError: SubprocessError? = nil
do {
try kqueueFd.close()
try _safelyClose(.fileDescriptor(kqueueFd))
} catch {
closeError = error as? Errno
closeError = error
}
do {
try shutdownReadFd.close()
try _safelyClose(.fileDescriptor(shutdownReadFd))
} catch {
closeError = error as? Errno
closeError = error
}
do {
try shutdownWriteFd.close()
try _safelyClose(.fileDescriptor(shutdownWriteFd))
} catch {
closeError = error as? Errno
closeError = error
}

if let closeError {
Expand Down
10 changes: 5 additions & 5 deletions Sources/Subprocess/IO/AsyncIO+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,16 @@ final class AsyncIO: Sendable {
// Clean up the monitor thread.
pthread_join(currentState.monitorThread, nil)

var closeError: Errno? = nil
var closeError: SubprocessError? = nil
do {
try epollFd.close()
try _safelyClose(.fileDescriptor(epollFd))
} catch {
closeError = error as? Errno
closeError = error
}
do {
try shutdownFd.close()
try _safelyClose(.fileDescriptor(shutdownFd))
} catch {
closeError = error as? Errno
closeError = error
}

if let closeError {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Subprocess/Platforms/Subprocess+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ internal func _isWaitprocessDescriptorSupported() -> Bool {
// If we can not retrieve pidfd, the system does not support waitid(P_PIDFD)
return false
}
defer { try? FileDescriptor(rawValue: selfPidfd).close() }
defer { try? _safelyClose(.fileDescriptor(FileDescriptor(rawValue: selfPidfd))) }
/// The following call will fail either with
/// - ECHILD: in this case we know P_PIDFD is supported and waitid correctly
/// reported that we don't have a child with the same selfPidfd;
Expand Down
8 changes: 4 additions & 4 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ extension Configuration {
// If exec fails we retry with the next candidate path, so
// close the pidfd here to avoid leaking it across retries.
if processDescriptor != .invalidDescriptor {
do {
try FileDescriptor(rawValue: processDescriptor).close()
do throws(SubprocessError) {
try _safelyClose(.fileDescriptor(FileDescriptor(rawValue: processDescriptor)))
} catch {
throw SubprocessError.spawnFailed(withUnderlyingError: error as? SubprocessError.UnderlyingError)
throw SubprocessError.spawnFailed(withUnderlyingError: error.underlyingError)
}
}
// Move on to another possible path
Expand Down Expand Up @@ -628,7 +628,7 @@ public struct ProcessIdentifier: Sendable, Hashable {

internal func close() {
if self.processDescriptor != .invalidDescriptor {
try? FileDescriptor(rawValue: self.processDescriptor).close()
try? _safelyClose(.fileDescriptor(FileDescriptor(rawValue: self.processDescriptor)))
}
}
}
Expand Down
161 changes: 161 additions & 0 deletions Tests/SubprocessTests/AsyncIOTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,167 @@ extension SubprocessAsyncIOTests {
}
}

// MARK: - Partial Read/Write, Backpressure & Boundary Tests
extension SubprocessAsyncIOTests {
// Ported intent: swift-nio NonBlockingFileIOTest.testReadingShortWorks.
// AsyncIO returns after a single underlying read, so a request larger than
// what's available yields a SHORT read rather than blocking for the full
// amount.
@Test func testShortRead() async throws {
let payload = randomData(count: 64)
try await runReadWriteTest { readIO, readTestBed in
let chunk = try await readIO.read(from: readTestBed.ioDescriptor, upTo: 64 * 1024)
#expect(chunk == payload)
} writer: { writeIO, writeTestBed in
_ = try await writeIO.write(payload, to: writeTestBed.ioDescriptor)
try await writeTestBed.finish()
}
}

// Ported intent: swift-nio ChannelTests.testPendingWritesWorkWithPartialWrites.
// A write much larger than the pipe buffer must fill the pipe, block on
// EAGAIN, wait for the reader to drain, and resume looping until every
// byte is written. The reader drains in small, delayed chunks to keep the
// pipe full and force many wait cycles. Asserts the write reports the full
// count (proving the partial-write loop) and the bytes survive intact.
func testWriteCompletesUnderBackpressure() async throws {
let payload = randomData(count: 512 * 1024)
try await runReadWriteTest { readIO, readTestBed in
var received: [UInt8] = []
while let chunk = try await readIO.read(from: readTestBed.ioDescriptor, upTo: 16 * 1024) {
received.append(contentsOf: chunk)
try await readTestBed.delay(.milliseconds(1))
}
#expect(received == payload)
} writer: { writeIO, writeTestBed in
let written = try await writeIO.write(payload, to: writeTestBed.ioDescriptor)
#expect(written == payload.count)
try await writeTestBed.finish()
}
}

// Ported intent: swift-nio NonBlockingFileIOTest.testReadManyChunks
// (2000 × 1-byte reads). Reading one byte at a time must deliver every byte
// in order with no loss or misalignment.
@Test func testReadManyTinyChunks() async throws {
let payload = randomData(count: 2048)
try await runReadWriteTest { readIO, readTestBed in
var received: [UInt8] = []
while let chunk = try await readIO.read(from: readTestBed.ioDescriptor, upTo: 1) {
received.append(contentsOf: chunk)
}
#expect(received == payload)
} writer: { writeIO, writeTestBed in
_ = try await writeIO.write(payload, to: writeTestBed.ioDescriptor)
try await writeTestBed.finish()
}
}

// Ported intent: swift-nio PipeChannelTest.testWriteEndGoingAway.
// Data written before the write end closes must be fully delivered, and only
// then EOF (`nil`) which exercises AsyncIO's final-drain path. Ordered
// deterministically (write + close, then read) via a manually-wired pipe.
@Test func testReadDrainsBufferedDataThenEOF() async throws {
var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input)
var writeChannel = try _require(pipe.writeFileDescriptor())
var readChannel = try _require(pipe.readFileDescriptor())
defer {
try? readChannel.safelyClose()
}

let payload = randomData(count: 4096)
// Write everything, then close the write end so the reader observes
// buffered data followed by EOF (not EOF first).
_ = try await AsyncIO.shared.write(payload, to: writeChannel)
try writeChannel.safelyClose()

var received: [UInt8] = []
while let chunk = try await AsyncIO.shared.read(from: readChannel, upTo: 64 * 1024) {
received.append(contentsOf: chunk)
}
#expect(received == payload)
}

// Reading with `upTo: .max` exercises the branch where AsyncIO sizes its
// buffer from `queryPipeBufferSize` instead of the requested length. It must
// return a (short) chunk rather than over-allocating or hanging.
@Test func testReadUpToMax() async throws {
let payload = randomData(count: 100)
try await runReadWriteTest { readIO, readTestBed in
var received: [UInt8] = []
while let chunk = try await readIO.read(from: readTestBed.ioDescriptor, upTo: .max) {
received.append(contentsOf: chunk)
}
#expect(received == payload)
} writer: { writeIO, writeTestBed in
_ = try await writeIO.write(payload, to: writeTestBed.ioDescriptor)
try await writeTestBed.finish()
}
}

@Test func testQueryPipeBufferSizeIsPositive() async throws {
var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input)
var writeChannel = try _require(pipe.writeFileDescriptor())
var readChannel = try _require(pipe.readFileDescriptor())
defer {
try? writeChannel.safelyClose()
try? readChannel.safelyClose()
}
#expect(AsyncIO.queryPipeBufferSize(for: readChannel.descriptor()) > 0)
#expect(AsyncIO.queryPipeBufferSize(for: writeChannel.descriptor()) > 0)
}
}

// MARK: - Concurrency & Write-Error Tests
extension SubprocessAsyncIOTests {
// Ported intent: swift-nio concurrent read/write tests. Drives many
// independent pipe round-trips through the shared `AsyncIO` at once to
// stress its registration/signal machinery for races.
func testConcurrentReadWriteAcrossManyPipes() async throws {
let pipeCount = 32
try await withThrowingTaskGroup(of: Void.self) { group in
for _ in 0..<pipeCount {
group.addTask {
let payload = randomData(count: Int.random(in: 1..<8192))
try await self.runReadWriteTest { readIO, readTestBed in
let received = try await self.readUntilEOF(
from: readTestBed.ioDescriptor, with: readIO
)
#expect(received == payload)
} writer: { writeIO, writeTestBed in
_ = try await writeIO.write(payload, to: writeTestBed.ioDescriptor)
try await writeTestBed.finish()
}
}
}
try await group.waitForAll()
}
}

// Ported intent: swift-nio PipeChannelTest.testWriteErrorsCloseChannel /
// testReadEndGoingAway. Writing to a pipe whose read end has gone away must
// surface as a write error rather than hang or silently succeed.
@Test func testWriteToPipeWithClosedReadEndThrows() async throws {
var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input)
var writeChannel = try _require(pipe.writeFileDescriptor())
var readChannel = try _require(pipe.readFileDescriptor())
defer {
try? writeChannel.safelyClose()
}
// Close the read end; the write descriptor remains valid and ours.
try readChannel.safelyClose()

await #expect {
_ = try await AsyncIO.shared.write(randomData(count: 64 * 1024), to: writeChannel)
} throws: { error in
guard let subprocessError = error as? SubprocessError else {
return false
}
return subprocessError.code == .init(.failedToWriteToSubprocess)
}
}
}

// MARK: - Utils
extension SubprocessAsyncIOTests {
final class TestBed: Sendable {
Expand Down
Loading
Loading