Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,10 @@ internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) {
}
#else
case .fileDescriptor(let fileDescriptor):
// Detach from epoll/queue before closing. A descriptor stays registered
// with epoll/kqueue across reads and writes so a stream of buffers costs one registration,
// not one per buffer.
AsyncIO.shared.removeRegistration(for: fileDescriptor)
do {
try fileDescriptor.close()
} catch {
Expand Down
121 changes: 65 additions & 56 deletions Sources/Subprocess/IO/AsyncIO+KQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -251,40 +251,55 @@ final class AsyncIO: Sendable {
_ fileDescriptor: FileDescriptor,
processIdentifier: ProcessIdentifier,
for event: Event
) -> SignalStream {
return SignalStream { (continuation: SignalStream.Continuation) -> () in
switch self.state {
case .success(let state):
if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) {
continuation.finish(throwing: nonBlockingFdError)
return
}
let filter: Int16
switch event {
case .read:
filter = Int16(EVFILT_READ)
case .write:
filter = Int16(EVFILT_WRITE)
}
) -> (stream: SignalStream, outcome: RegistrationOutcome) {
// `.bufferingNewest(1)` latches a readiness edge that fires while no
// consumer is awaiting, so a wakeup is never dropped between a read's
// `EAGAIN` and its `await`.
let (stream, continuation) = SignalStream.makeStream(
bufferingPolicy: .bufferingNewest(1)
)

// Hold the lock across the map insert and `kevent` so a
// concurrent `cancelAsyncIO` cannot slip in between the
// two steps and observe a half-registered descriptor.
let outcome: RegistrationOutcome = _registration.withLock { storage in
guard
storage.register(
fileDescriptor: fileDescriptor.rawValue,
continuation: continuation,
processIdentifier: processIdentifier
)
else {
return .alreadyCancelled
}
switch self.state {
case .failure(let setupError):
continuation.finish(throwing: setupError)
return (stream, .failed(setupError))
case .success(let state):
if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) {
continuation.finish(throwing: nonBlockingFdError)
return (stream, .failed(nonBlockingFdError))
}
let filter: Int16
switch event {
case .read:
filter = Int16(EVFILT_READ)
case .write:
filter = Int16(EVFILT_WRITE)
}

// Hold the lock across the map insert and `kevent` so a
// concurrent `cancelAsyncIO` cannot slip in between the
// two steps and observe a half-registered descriptor.
let outcome: RegistrationOutcome = _registration.withLock { storage in
switch storage.register(
fileDescriptor: fileDescriptor.rawValue,
continuation: continuation,
processIdentifier: processIdentifier
) {
case .alreadyCancelled:
return .alreadyCancelled
case .updated:
// Already attached to kqueue by a previous read or write;
// reuse the existing registration and skip the syscall.
return .registered
case .registered:
// `EV_CLEAR` makes the filter edge-triggered: an event is
// delivered only when new data arrives, so a persistent
// registration doesn't spin the monitor thread while
// buffered bytes sit unread between reads.
var kev = _makeKevent(
ident: UInt(fileDescriptor.rawValue),
filter: filter,
flags: UInt16(EV_ADD | EV_ENABLE)
flags: UInt16(EV_ADD | EV_CLEAR)
)
let rc = _kevent(
state.kqueueFileDescriptor,
Expand All @@ -306,34 +321,31 @@ final class AsyncIO: Sendable {
}
return .registered
}
}

switch outcome {
case .registered:
break
case .alreadyCancelled:
continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
case .failure(let setupError):
continuation.finish(throwing: setupError)
return
switch outcome {
case .registered:
break
case .alreadyCancelled:
continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
return (stream, outcome)
}
}

internal func removeRegistration(for fileDescriptor: FileDescriptor) throws(SubprocessError) {
switch self.state {
case .success(let state):
let c = _registration.withLock { store -> SignalStream.Continuation? in
guard
let continuation = store.removeRegistration(
for: fileDescriptor.rawValue
)
else {
return nil
}
internal func removeRegistration(for fileDescriptor: FileDescriptor) {
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
guard
let continuation = store.removeRegistration(
for: fileDescriptor.rawValue
)
else {
return nil
}

if case .success(let state) = self.state {
for filter in [EVFILT_READ, EVFILT_WRITE] {
var kev = _makeKevent(
ident: UInt(fileDescriptor.rawValue),
Expand All @@ -349,13 +361,10 @@ final class AsyncIO: Sendable {
nil
)
}

return continuation
}
c?.finish()
case .failure(let setupFailure):
throw setupFailure
return continuation
}
continuation?.finish()
}

internal func cancelAsyncIO(for processIdentifier: ProcessIdentifier) throws(SubprocessError) {
Expand Down
160 changes: 81 additions & 79 deletions Sources/Subprocess/IO/AsyncIO+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import Musl
import _SubprocessCShims
import Synchronization

// `EPOLLET` (edge-triggered) is the bit `1 << 31` in the Linux epoll ABI, but
// the C symbol imports inconsistently across libraries.Define the bit directly
// so it's uniform everywhere.
private let _EPOLLET: UInt32 = 1 << 31
private let _epollEventSize = 256
private let _registration: Mutex<Registration> = Mutex(Registration())

Expand Down Expand Up @@ -231,46 +235,61 @@ final class AsyncIO: Sendable {
_ fileDescriptor: FileDescriptor,
processIdentifier: ProcessIdentifier,
for event: Event
) -> SignalStream {
return SignalStream { (continuation: SignalStream.Continuation) -> () in
// Nothing to do if setup failed.
switch self.state {
case .success(let state):
// Set the file descriptor to non-blocking.
if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) {
continuation.finish(throwing: nonBlockingFdError)
return
}
// Pick the event to register.
let targetEvent: EPOLL_EVENTS
switch event {
case .read:
targetEvent = EPOLL_EVENTS(EPOLLIN)
case .write:
targetEvent = EPOLL_EVENTS(EPOLLOUT)
}
) -> (stream: SignalStream, outcome: RegistrationOutcome) {
// `.bufferingNewest(1)` latches a readiness edge that fires while no
// consumer is awaiting, so a wakeup is never dropped between a read's
// `EAGAIN` and its `await`.
let (stream, continuation) = SignalStream.makeStream(
bufferingPolicy: .bufferingNewest(1)
)

// Hold the lock across both the map insert and `epoll_ctl` so
// a concurrent `cancelAsyncIO` either runs entirely before
// (in which case `register` returns `false`) or entirely
// after (in which case it sees the descriptor in the map
// and in epoll). Without this, a cancellation could observe
// the map entry between `storage.register` and
// `epoll_ctl(EPOLL_CTL_ADD)` and try to delete a descriptor
// that isn't yet in the epoll set.
let outcome: RegistrationOutcome = _registration.withLock { storage in
guard
storage.register(
fileDescriptor: fileDescriptor.rawValue,
continuation: continuation,
processIdentifier: processIdentifier
)
else {
return .alreadyCancelled
}
// Nothing to do if setup failed.
switch self.state {
case .failure(let setupError):
continuation.finish(throwing: setupError)
return (stream, .failed(setupError))
case .success(let state):
// Set the file descriptor to non-blocking.
if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) {
continuation.finish(throwing: nonBlockingFdError)
return (stream, .failed(nonBlockingFdError))
}
// Pick the event to register.
let targetEvent: EPOLL_EVENTS
switch event {
case .read:
targetEvent = EPOLL_EVENTS(EPOLLIN)
case .write:
targetEvent = EPOLL_EVENTS(EPOLLOUT)
}

// Hold the lock across both the map insert and `epoll_ctl` so
// a concurrent `cancelAsyncIO` either runs entirely before
// (in which case `register` returns `.alreadyCancelled`) or
// entirely after (in which case it sees the descriptor in the
// map and in epoll). Without this, a cancellation could observe
// the map entry between `storage.register` and
// `epoll_ctl(EPOLL_CTL_ADD)` and try to delete a descriptor
// that isn't yet in the epoll set.
let outcome: RegistrationOutcome = _registration.withLock { storage in
switch storage.register(
fileDescriptor: fileDescriptor.rawValue,
continuation: continuation,
processIdentifier: processIdentifier
) {
case .alreadyCancelled:
return .alreadyCancelled
case .updated:
// Already attached to epoll by a previous read or write;
// reuse the existing registration and skip the syscall.
return .registered
case .registered:
// Register edge-triggered (`EPOLLET`): with a persistent
// registration, `epoll_wait` must report the descriptor
// only when new data arrives, otherwise the monitor thread
// would spin while buffered bytes sit unread between reads.
var event = epoll_event(
events: targetEvent.rawValue,
events: targetEvent.rawValue | _EPOLLET,
data: epoll_data(fd: fileDescriptor.rawValue)
)
let rc = epoll_ctl(
Expand All @@ -291,59 +310,47 @@ final class AsyncIO: Sendable {
}
return .registered
}
}

switch outcome {
case .registered:
break
case .alreadyCancelled:
continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
case .failure(let setupError):
continuation.finish(throwing: setupError)
return
switch outcome {
case .registered:
break
case .alreadyCancelled:
continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
return (stream, outcome)
}
}

internal func removeRegistration(for fileDescriptor: FileDescriptor) throws(SubprocessError) {
switch self.state {
case .success(let state):
let c = try _registration.withLock { store throws(SubprocessError) -> SignalStream.Continuation? in
guard
let continuation = store.removeRegistration(
for: fileDescriptor.rawValue
)
else {
return nil
}
internal func removeRegistration(for fileDescriptor: FileDescriptor) {
let continuation = _registration.withLock { store -> SignalStream.Continuation? in
guard
let continuation = store.removeRegistration(
for: fileDescriptor.rawValue
)
else {
return nil
}

let rc = epoll_ctl(
if case .success(let state) = self.state {
_ = epoll_ctl(
state.epollFileDescriptor,
EPOLL_CTL_DEL,
fileDescriptor.rawValue,
nil
)

if rc != 0 {
throw SubprocessError.asyncIOFailed(
reason: "failed to remove \(fileDescriptor.rawValue) from epoll list",
underlyingError: Errno(rawValue: errno)
)
}
return continuation
}
c?.finish()
case .failure(let error):
throw error
return continuation
}
continuation?.finish()
}

internal func cancelAsyncIO(for processIdentifier: ProcessIdentifier) throws(SubprocessError) {
switch self.state {
case .success(let state):
let cancelledContinuations = try _registration.withLock { storage throws(SubprocessError) -> [SignalStream.Continuation] in
let cancelledContinuations = _registration.withLock { storage -> [SignalStream.Continuation] in
let previousRegistrations = storage.cancel(processIdentifier: processIdentifier)
guard !previousRegistrations.isEmpty else {
return []
Expand All @@ -352,18 +359,13 @@ final class AsyncIO: Sendable {
for registration in previousRegistrations {
toBeCancelled.append(registration.continuation)

let rc = epoll_ctl(
// Best-effort detach; ignore `ENOENT` (already gone).
_ = epoll_ctl(
state.epollFileDescriptor,
EPOLL_CTL_DEL,
registration.fileDescriptor,
nil
)
if rc != 0 && errno != ENOENT {
throw SubprocessError.asyncIOFailed(
reason: "failed to remove \(registration.fileDescriptor) from epoll list",
underlyingError: Errno(rawValue: errno)
)
}
}
return toBeCancelled
}
Expand Down
Loading
Loading