diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 33ce6f63..0e883b23 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -838,6 +838,10 @@ internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) { } #else case .fileDescriptor(let fileDescriptor): + // Detach from epoll/queue before closing. A descriptor stays registered + // with epoll/kqueue across reads and writes so a stream of buffers costs one registration, + // not one per buffer. + AsyncIO.shared.removeRegistration(for: fileDescriptor) do { try fileDescriptor.close() } catch { diff --git a/Sources/Subprocess/IO/AsyncIO+KQueue.swift b/Sources/Subprocess/IO/AsyncIO+KQueue.swift index 5c709a5e..258c998a 100644 --- a/Sources/Subprocess/IO/AsyncIO+KQueue.swift +++ b/Sources/Subprocess/IO/AsyncIO+KQueue.swift @@ -251,40 +251,55 @@ final class AsyncIO: Sendable { _ fileDescriptor: FileDescriptor, processIdentifier: ProcessIdentifier, for event: Event - ) -> SignalStream { - return SignalStream { (continuation: SignalStream.Continuation) -> () in - switch self.state { - case .success(let state): - if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) { - continuation.finish(throwing: nonBlockingFdError) - return - } - let filter: Int16 - switch event { - case .read: - filter = Int16(EVFILT_READ) - case .write: - filter = Int16(EVFILT_WRITE) - } + ) -> (stream: SignalStream, outcome: RegistrationOutcome) { + // `.bufferingNewest(1)` latches a readiness edge that fires while no + // consumer is awaiting, so a wakeup is never dropped between a read's + // `EAGAIN` and its `await`. + let (stream, continuation) = SignalStream.makeStream( + bufferingPolicy: .bufferingNewest(1) + ) - // Hold the lock across the map insert and `kevent` so a - // concurrent `cancelAsyncIO` cannot slip in between the - // two steps and observe a half-registered descriptor. - let outcome: RegistrationOutcome = _registration.withLock { storage in - guard - storage.register( - fileDescriptor: fileDescriptor.rawValue, - continuation: continuation, - processIdentifier: processIdentifier - ) - else { - return .alreadyCancelled - } + switch self.state { + case .failure(let setupError): + continuation.finish(throwing: setupError) + return (stream, .failed(setupError)) + case .success(let state): + if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) { + continuation.finish(throwing: nonBlockingFdError) + return (stream, .failed(nonBlockingFdError)) + } + let filter: Int16 + switch event { + case .read: + filter = Int16(EVFILT_READ) + case .write: + filter = Int16(EVFILT_WRITE) + } + // Hold the lock across the map insert and `kevent` so a + // concurrent `cancelAsyncIO` cannot slip in between the + // two steps and observe a half-registered descriptor. + let outcome: RegistrationOutcome = _registration.withLock { storage in + switch storage.register( + fileDescriptor: fileDescriptor.rawValue, + continuation: continuation, + processIdentifier: processIdentifier + ) { + case .alreadyCancelled: + return .alreadyCancelled + case .updated: + // Already attached to kqueue by a previous read or write; + // reuse the existing registration and skip the syscall. + return .registered + case .registered: + // `EV_CLEAR` makes the filter edge-triggered: an event is + // delivered only when new data arrives, so a persistent + // registration doesn't spin the monitor thread while + // buffered bytes sit unread between reads. var kev = _makeKevent( ident: UInt(fileDescriptor.rawValue), filter: filter, - flags: UInt16(EV_ADD | EV_ENABLE) + flags: UInt16(EV_ADD | EV_CLEAR) ) let rc = _kevent( state.kqueueFileDescriptor, @@ -306,34 +321,31 @@ final class AsyncIO: Sendable { } return .registered } + } - switch outcome { - case .registered: - break - case .alreadyCancelled: - continuation.finish() - case .failed(let error): - continuation.finish(throwing: error) - } - case .failure(let setupError): - continuation.finish(throwing: setupError) - return + switch outcome { + case .registered: + break + case .alreadyCancelled: + continuation.finish() + case .failed(let error): + continuation.finish(throwing: error) } + return (stream, outcome) } } - internal func removeRegistration(for fileDescriptor: FileDescriptor) throws(SubprocessError) { - switch self.state { - case .success(let state): - let c = _registration.withLock { store -> SignalStream.Continuation? in - guard - let continuation = store.removeRegistration( - for: fileDescriptor.rawValue - ) - else { - return nil - } + internal func removeRegistration(for fileDescriptor: FileDescriptor) { + let continuation = _registration.withLock { store -> SignalStream.Continuation? in + guard + let continuation = store.removeRegistration( + for: fileDescriptor.rawValue + ) + else { + return nil + } + if case .success(let state) = self.state { for filter in [EVFILT_READ, EVFILT_WRITE] { var kev = _makeKevent( ident: UInt(fileDescriptor.rawValue), @@ -349,13 +361,10 @@ final class AsyncIO: Sendable { nil ) } - - return continuation } - c?.finish() - case .failure(let setupFailure): - throw setupFailure + return continuation } + continuation?.finish() } internal func cancelAsyncIO(for processIdentifier: ProcessIdentifier) throws(SubprocessError) { diff --git a/Sources/Subprocess/IO/AsyncIO+Linux.swift b/Sources/Subprocess/IO/AsyncIO+Linux.swift index 8f1da378..08a47a46 100644 --- a/Sources/Subprocess/IO/AsyncIO+Linux.swift +++ b/Sources/Subprocess/IO/AsyncIO+Linux.swift @@ -31,6 +31,10 @@ import Musl import _SubprocessCShims import Synchronization +// `EPOLLET` (edge-triggered) is the bit `1 << 31` in the Linux epoll ABI, but +// the C symbol imports inconsistently across libraries.Define the bit directly +// so it's uniform everywhere. +private let _EPOLLET: UInt32 = 1 << 31 private let _epollEventSize = 256 private let _registration: Mutex = Mutex(Registration()) @@ -231,46 +235,61 @@ final class AsyncIO: Sendable { _ fileDescriptor: FileDescriptor, processIdentifier: ProcessIdentifier, for event: Event - ) -> SignalStream { - return SignalStream { (continuation: SignalStream.Continuation) -> () in - // Nothing to do if setup failed. - switch self.state { - case .success(let state): - // Set the file descriptor to non-blocking. - if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) { - continuation.finish(throwing: nonBlockingFdError) - return - } - // Pick the event to register. - let targetEvent: EPOLL_EVENTS - switch event { - case .read: - targetEvent = EPOLL_EVENTS(EPOLLIN) - case .write: - targetEvent = EPOLL_EVENTS(EPOLLOUT) - } + ) -> (stream: SignalStream, outcome: RegistrationOutcome) { + // `.bufferingNewest(1)` latches a readiness edge that fires while no + // consumer is awaiting, so a wakeup is never dropped between a read's + // `EAGAIN` and its `await`. + let (stream, continuation) = SignalStream.makeStream( + bufferingPolicy: .bufferingNewest(1) + ) - // Hold the lock across both the map insert and `epoll_ctl` so - // a concurrent `cancelAsyncIO` either runs entirely before - // (in which case `register` returns `false`) or entirely - // after (in which case it sees the descriptor in the map - // and in epoll). Without this, a cancellation could observe - // the map entry between `storage.register` and - // `epoll_ctl(EPOLL_CTL_ADD)` and try to delete a descriptor - // that isn't yet in the epoll set. - let outcome: RegistrationOutcome = _registration.withLock { storage in - guard - storage.register( - fileDescriptor: fileDescriptor.rawValue, - continuation: continuation, - processIdentifier: processIdentifier - ) - else { - return .alreadyCancelled - } + // Nothing to do if setup failed. + switch self.state { + case .failure(let setupError): + continuation.finish(throwing: setupError) + return (stream, .failed(setupError)) + case .success(let state): + // Set the file descriptor to non-blocking. + if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) { + continuation.finish(throwing: nonBlockingFdError) + return (stream, .failed(nonBlockingFdError)) + } + // Pick the event to register. + let targetEvent: EPOLL_EVENTS + switch event { + case .read: + targetEvent = EPOLL_EVENTS(EPOLLIN) + case .write: + targetEvent = EPOLL_EVENTS(EPOLLOUT) + } + // Hold the lock across both the map insert and `epoll_ctl` so + // a concurrent `cancelAsyncIO` either runs entirely before + // (in which case `register` returns `.alreadyCancelled`) or + // entirely after (in which case it sees the descriptor in the + // map and in epoll). Without this, a cancellation could observe + // the map entry between `storage.register` and + // `epoll_ctl(EPOLL_CTL_ADD)` and try to delete a descriptor + // that isn't yet in the epoll set. + let outcome: RegistrationOutcome = _registration.withLock { storage in + switch storage.register( + fileDescriptor: fileDescriptor.rawValue, + continuation: continuation, + processIdentifier: processIdentifier + ) { + case .alreadyCancelled: + return .alreadyCancelled + case .updated: + // Already attached to epoll by a previous read or write; + // reuse the existing registration and skip the syscall. + return .registered + case .registered: + // Register edge-triggered (`EPOLLET`): with a persistent + // registration, `epoll_wait` must report the descriptor + // only when new data arrives, otherwise the monitor thread + // would spin while buffered bytes sit unread between reads. var event = epoll_event( - events: targetEvent.rawValue, + events: targetEvent.rawValue | _EPOLLET, data: epoll_data(fd: fileDescriptor.rawValue) ) let rc = epoll_ctl( @@ -291,59 +310,47 @@ final class AsyncIO: Sendable { } return .registered } + } - switch outcome { - case .registered: - break - case .alreadyCancelled: - continuation.finish() - case .failed(let error): - continuation.finish(throwing: error) - } - case .failure(let setupError): - continuation.finish(throwing: setupError) - return + switch outcome { + case .registered: + break + case .alreadyCancelled: + continuation.finish() + case .failed(let error): + continuation.finish(throwing: error) } + return (stream, outcome) } } - internal func removeRegistration(for fileDescriptor: FileDescriptor) throws(SubprocessError) { - switch self.state { - case .success(let state): - let c = try _registration.withLock { store throws(SubprocessError) -> SignalStream.Continuation? in - guard - let continuation = store.removeRegistration( - for: fileDescriptor.rawValue - ) - else { - return nil - } + internal func removeRegistration(for fileDescriptor: FileDescriptor) { + let continuation = _registration.withLock { store -> SignalStream.Continuation? in + guard + let continuation = store.removeRegistration( + for: fileDescriptor.rawValue + ) + else { + return nil + } - let rc = epoll_ctl( + if case .success(let state) = self.state { + _ = epoll_ctl( state.epollFileDescriptor, EPOLL_CTL_DEL, fileDescriptor.rawValue, nil ) - - if rc != 0 { - throw SubprocessError.asyncIOFailed( - reason: "failed to remove \(fileDescriptor.rawValue) from epoll list", - underlyingError: Errno(rawValue: errno) - ) - } - return continuation } - c?.finish() - case .failure(let error): - throw error + return continuation } + continuation?.finish() } internal func cancelAsyncIO(for processIdentifier: ProcessIdentifier) throws(SubprocessError) { switch self.state { case .success(let state): - let cancelledContinuations = try _registration.withLock { storage throws(SubprocessError) -> [SignalStream.Continuation] in + let cancelledContinuations = _registration.withLock { storage -> [SignalStream.Continuation] in let previousRegistrations = storage.cancel(processIdentifier: processIdentifier) guard !previousRegistrations.isEmpty else { return [] @@ -352,18 +359,13 @@ final class AsyncIO: Sendable { for registration in previousRegistrations { toBeCancelled.append(registration.continuation) - let rc = epoll_ctl( + // Best-effort detach; ignore `ENOENT` (already gone). + _ = epoll_ctl( state.epollFileDescriptor, EPOLL_CTL_DEL, registration.fileDescriptor, nil ) - if rc != 0 && errno != ENOENT { - throw SubprocessError.asyncIOFailed( - reason: "failed to remove \(registration.fileDescriptor) from epoll list", - underlyingError: Errno(rawValue: errno) - ) - } } return toBeCancelled } diff --git a/Sources/Subprocess/IO/AsyncIO+Unix.swift b/Sources/Subprocess/IO/AsyncIO+Unix.swift index ee3a3fd7..d380a4e5 100644 --- a/Sources/Subprocess/IO/AsyncIO+Unix.swift +++ b/Sources/Subprocess/IO/AsyncIO+Unix.swift @@ -101,107 +101,110 @@ extension AsyncIO { var resultBuffer: [UInt8] = Array( repeating: 0, count: bufferLength ) - let signalStream = self.registerFileDescriptor( + // Register the descriptor with the kqueue/epoll. It stays registered across reads + // and is only detached when the caller closes it (or on EOF/error/cancellation), + // so sustained streaming costs one `epoll_ctl`/`kevent` registration per descriptor + // instead of one per buffer. + let (signalStream, outcome) = self.registerFileDescriptor( fileDescriptor, processIdentifier: processIdentifier, for: .read ) - - do { - /// Outer loop: every iteration signals that the descriptor is ready - /// for more data. - for try await _ in signalStream { - /// Inner loop: repeatedly call `read()` and read more data until: - /// 1. We reach EOF (read length is 0), in which case return the result. - /// 2. We read `maxLength` bytes, in which case return the result. - /// 3. `read()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. - /// In this case `break` out of the inner loop and `await` the next - /// signal in the outer loop. - while true { - let bytesRead: Int - do { - bytesRead = try resultBuffer.withUnsafeMutableBytes { bufferPointer in - // Read directly into the buffer at the offset. - return try fileDescriptor.read( - into: bufferPointer, - retryOnInterrupt: true - ) - } - } catch { - // FileDescriptor.read only throws Errno - let _errno = error as! Errno - - if self.shouldWaitForNextSignal(with: _errno.rawValue) { - // No data available right now; wait for the next signal. - break - } else { - // Throw all other errors. - try self.removeRegistration(for: fileDescriptor) - throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: _errno - ) - } - } - - if bytesRead > 0 { - // Read some data. Return immediately so the caller can - // process it without waiting for the buffer to fill. - try self.removeRegistration(for: fileDescriptor) - resultBuffer.removeLast(resultBuffer.count - bytesRead) - return resultBuffer - } else if bytesRead == 0 { - // We reached EOF. - try self.removeRegistration(for: fileDescriptor) - return nil - } else { - // This branch is unreachable in practice. - throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: nil - ) - } - } - } - - // The signal stream finished without delivering more data. This - // happens when `cancelAsyncIO` runs because the child has exited - // and we want to stop waiting for an EOF that may never come (the - // pipe could be held open by an inherited grandchild). Try a final - // non-blocking read so we don't drop bytes that were already in - // the kernel buffer when cancellation arrived; subsequent calls - // keep draining one chunk at a time and then return EOF. - try self.removeRegistration(for: fileDescriptor) - let drainedLength = resultBuffer.withUnsafeMutableBytes { bufferPtr in - // The descriptor was set to non-blocking in `setNonblocking`, - // so this `read` returns immediately even if no data is ready. - do { + switch outcome { + case .registered, .alreadyCancelled: + // `.alreadyCancelled` still drains: the loop below reads whatever + // is already buffered without awaiting, since the signal stream is + // already finished. + break + case .failed(let error): + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: error.underlyingError + ) + } + var iterator = signalStream.makeAsyncIterator() + + // Each iteration first attempts a non-blocking read, draining bytes + // that are already buffered, and only awaits a readiness signal when + // the read reports `EAGAIN`. Because the descriptor is registered edge-triggered, + // this "try first" step is what keeps a persistent registration correct: + // we never park while data is buffered, so a missed edge can't strand bytes. + readLoop: while true { + do { + let bytesRead = try resultBuffer.withUnsafeMutableBytes { bufferPointer in + // Read directly into the buffer. return try fileDescriptor.read( - into: bufferPtr, + into: bufferPointer, retryOnInterrupt: true ) - } catch { - // EAGAIN, EWOULDBLOCK, or another error: no more data - // is available right now. - return 0 } - } - if drainedLength > 0 { - resultBuffer.removeLast(resultBuffer.count - drainedLength) - return resultBuffer + if bytesRead > 0 { + // Read some data. Return immediately so the caller can + // process it without waiting for the buffer to fill. The + // descriptor stays registered for the next read. + resultBuffer.removeLast(resultBuffer.count - bytesRead) + return resultBuffer + } else { + // EOF: no more data will ever arrive. Detach now. + self.removeRegistration(for: fileDescriptor) + return nil + } + } catch { + // FileDescriptor.read only throws Errno + let _errno = error as! Errno + guard self.shouldWaitForNextSignal(with: _errno.rawValue) else { + // Throw all other errors. + self.removeRegistration(for: fileDescriptor) + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: _errno + ) + } + // No data available right now. Fall through to await the next signal } - return nil - } catch { - try self.removeRegistration(for: fileDescriptor) - // Reset the error code to `.failedToRead` to match other platforms. - guard let originalError = error as? SubprocessError else { + do { + if try await iterator.next() == nil { + // The signal stream finished without delivering more data. + // This happens when `cancelAsyncIO` runs because the child + // has exited and we want to stop waiting for an EOF that may + // never come (the pipe could be held open by an inherited + // grandchild). Fall through to a final drain. + break readLoop + } + } catch { + self.removeRegistration(for: fileDescriptor) + // Reset the error code to `.failedToRead` to match other platforms. + guard let originalError = error as? SubprocessError else { + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: nil + ) + } throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: nil + withUnderlyingError: originalError.underlyingError ) } - throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: originalError.underlyingError - ) } + + // Cancellation drain: `cancelAsyncIO` already removed the registration, + // so try one last non-blocking read to surface bytes that were in the + // kernel buffer when cancellation arrived. + let drainedLength = resultBuffer.withUnsafeMutableBytes { bufferPtr -> Int in + // The descriptor was set to non-blocking in `setNonblocking`, so + // this `read` returns immediately even if no data is ready. + do { + return try fileDescriptor.read( + into: bufferPtr, + retryOnInterrupt: true + ) + } catch { + // EAGAIN, EWOULDBLOCK, or another error: no more data right now. + return 0 + } + } + if drainedLength > 0 { + resultBuffer.removeLast(resultBuffer.count - drainedLength) + return resultBuffer + } + return nil } func write( @@ -221,74 +224,84 @@ extension AsyncIO { return 0 } let fileDescriptor = diskIO.descriptor() - let signalStream = self.registerFileDescriptor( + // Register the descriptor once. It stays registered for reuse across + // writes and is detached when stdin is closed, or on error/cancellation. + let (signalStream, outcome) = self.registerFileDescriptor( fileDescriptor, processIdentifier: processIdentifier, for: .write ) + switch outcome { + case .registered: + break + case .alreadyCancelled: + // The subprocess has exited and its stdin no longer has a reader. + // Report a zero-length write rather than pushing bytes that nothing will consume. + return 0 + case .failed(let error): + throw SubprocessError.failedToWriteToProcess( + withUnderlyingError: error.underlyingError + ) + } + var iterator = signalStream.makeAsyncIterator() + var writtenLength: Int = 0 - do { - /// Outer loop: every iteration signals that the descriptor is ready - /// for more data. - for try await _ in signalStream { - /// Inner loop: repeatedly call `write()` and write more data until: - /// 1. We've written `span.byteCount` bytes. - /// 2. `FileDescriptor.write()` throws `Errno` with `EAGAIN` or - /// `EWOULDBLOCK`. In this case `break` out of the inner loop and - /// `await` the next signal in the outer loop. - while true { - let written: Int - do { - written = try span.extracting( - last: span.byteCount - writtenLength - ).withUnsafeBytes { - return try fileDescriptor.write( - $0, - retryOnInterrupt: true - ) - } - } catch { - // FileDescriptor.write only throws Errno - let _errno = error as! Errno - - if self.shouldWaitForNextSignal(with: _errno.rawValue) { - // The pipe is full right now; wait for the next signal. - break - } else { - // Throw all other errors. - try self.removeRegistration(for: fileDescriptor) - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: _errno - ) - } - } - - writtenLength += written - if writtenLength >= span.byteCount { - // Wrote all data. - try self.removeRegistration(for: fileDescriptor) - return writtenLength - } + // Each iteration first attempts a non-blocking write and only awaits a + // writability signal when the pipe is full (`EAGAIN`). The descriptor + // is registered edge-triggered, so we must never park while the pipe + // still has space. + writeLoop: while true { + do { + let written = try span.extracting( + last: span.byteCount - writtenLength + ).withUnsafeBytes { + return try fileDescriptor.write( + $0, + retryOnInterrupt: true + ) + } + writtenLength += written + if writtenLength >= span.byteCount { + // Wrote all data. The descriptor stays registered for reuse. + return writtenLength } + // Wrote a partial chunk: the pipe is full now. Retry, which + // either writes more or reports `EAGAIN` and waits below. + continue writeLoop + } catch { + // FileDescriptor.write only throws Errno + let _errno = error as! Errno + guard self.shouldWaitForNextSignal(with: _errno.rawValue) else { + // Throw all other errors. + self.removeRegistration(for: fileDescriptor) + throw SubprocessError.failedToWriteToProcess( + withUnderlyingError: _errno + ) + } + // The pipe is full right now. Fall through to await the next ready signal. } - // The signal stream finished while data remained to write - // (typically because `cancelAsyncIO` ran after the child exited). - // Return whatever bytes the call already pushed so the caller - // observes a partial write rather than misreading silence as a - // successful zero-byte write. - try self.removeRegistration(for: fileDescriptor) - return writtenLength - } catch { - // Reset the error code to `.failedToWrite` to match other platforms. - guard let originalError = error as? SubprocessError else { + do { + if try await iterator.next() == nil { + // The signal stream finished while data remained to write + // (typically because `cancelAsyncIO` ran after the child + // exited). Return whatever bytes the call already pushed so + // the caller observes a partial write rather than misreading + // silence as a successful zero-byte write. + return writtenLength + } + } catch { + self.removeRegistration(for: fileDescriptor) + // Reset the error code to `.failedToWrite` to match other platforms. + guard let originalError = error as? SubprocessError else { + throw SubprocessError.failedToWriteToProcess( + withUnderlyingError: error as? SubprocessError.UnderlyingError + ) + } throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: error as? SubprocessError.UnderlyingError + withUnderlyingError: originalError.underlyingError ) } - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: originalError.underlyingError - ) } } @@ -338,6 +351,20 @@ internal struct Registration { typealias Record = (continuation: SignalStream.Continuation, processIdentifier: ProcessIdentifier) typealias CancelledContinuation = (continuation: SignalStream.Continuation, fileDescriptor: PlatformFileDescriptor) + /// The result of a single attempt to register a file descriptor. + enum RegisterResult { + /// The first time this descriptor was seen. The caller must still + /// attach it to the epoll / kqueue (`epoll_ctl(ADD)` / `kevent(EV_ADD)`). + case registered + /// The descriptor was already attached by a previous read or write; + /// the existing attachment is reused and only the continuation is + /// replaced, so the caller skips the registration syscall. + case updated + /// The owning process is already cancelled. The caller finishes the + /// supplied continuation immediately and skips the setup. + case alreadyCancelled + } + private var fileDescriptorMap: [PlatformFileDescriptor: Record] private var processIdentifierMap: [ProcessIdentifier: Set] @@ -363,24 +390,33 @@ internal struct Registration { /// - continuation: The continuation to resume when the descriptor /// becomes ready or the I/O is cancelled. /// - processIdentifier: The process that owns the descriptor. - /// - Returns: `true` if the registration was added; `false` if the - /// process has already been cancelled. When the function returns - /// `false`, the caller must finish the supplied continuation - /// immediately and skip any further setup such as attaching the - /// descriptor to `epoll` or `kqueue`. + /// - Returns: A ``RegisterResult`` describing what the caller must do next. + /// When the result is ``RegisterResult/registered`` the caller attaches + /// the descriptor to the multiplexer; for ``RegisterResult/updated`` the + /// attachment already exists and is reused. For + /// ``RegisterResult/alreadyCancelled`` the caller finishes the supplied + /// continuation immediately and performs no further setup. mutating func register( fileDescriptor: PlatformFileDescriptor, continuation: SignalStream.Continuation, processIdentifier: ProcessIdentifier - ) -> Bool { + ) -> RegisterResult { if self.cancelledProcesses.contains(processIdentifier) { - return false + return .alreadyCancelled + } + if let existing = self.fileDescriptorMap[fileDescriptor] { + // The descriptor is already attached to the multiplexer. Finish the + // previous continuation so any stale awaiter unblocks, then replace + // it and reuse the existing attachment. + existing.continuation.finish() + self.fileDescriptorMap[fileDescriptor] = (continuation, processIdentifier) + return .updated } self.fileDescriptorMap[fileDescriptor] = (continuation, processIdentifier) var fileDescriptorSet = self.processIdentifierMap[processIdentifier] ?? Set() fileDescriptorSet.insert(fileDescriptor) self.processIdentifierMap[processIdentifier] = fileDescriptorSet - return true + return .registered } /// Removes the registration for `fileDescriptor`. diff --git a/Tests/SubprocessTests/AsyncIOTests.swift b/Tests/SubprocessTests/AsyncIOTests.swift index 282be343..eb2ca2dc 100644 --- a/Tests/SubprocessTests/AsyncIOTests.swift +++ b/Tests/SubprocessTests/AsyncIOTests.swift @@ -342,7 +342,7 @@ extension SubprocessAsyncIOTests { // byte is written. The reader drains in small, delayed chunks to keep the // pipe full and force many wait cycles. Asserts the write reports the full // count (proving the partial-write loop) and the bytes survive intact. - func testWriteCompletesUnderBackpressure() async throws { + @Test func testWriteCompletesUnderBackpressure() async throws { let payload = randomData(count: 512 * 1024) try await runReadWriteTest { readIO, readTestBed in var received: [UInt8] = [] @@ -435,7 +435,7 @@ extension SubprocessAsyncIOTests { // Ported intent: swift-nio concurrent read/write tests. Drives many // independent pipe round-trips through the shared `AsyncIO` at once to // stress its registration/signal machinery for races. - func testConcurrentReadWriteAcrossManyPipes() async throws { + @Test func testConcurrentReadWriteAcrossManyPipes() async throws { let pipeCount = 32 try await withThrowingTaskGroup(of: Void.self) { group in for _ in 0..