Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,68 @@ extension Configuration {
return self._errorReadEnd.take()
}
}

#if !os(Windows)
/// The maximum number of `fork`/`exec` attempts, including the first one,
/// before a transient, retryable spawn failure is surfaced to the caller.
private static let maxSpawnAttempts = 6
/// The base spawn-retry backoff in nanoseconds. The delay doubles each
/// attempt up to ``spawnRetryBackoffCapNanoseconds``, with jitter applied.
private static let spawnRetryBackoffBaseNanoseconds: UInt64 = 1_000_000
/// The maximum spawn-retry backoff in nanoseconds.
private static let spawnRetryBackoffCapNanoseconds: UInt64 = 100_000_000

/// A jittered, capped exponential backoff (in nanoseconds) for the given
/// spawn attempt.
internal static func spawnRetryBackoffNanoseconds(forAttempt attempt: Int) -> UInt64 {
let shift = UInt64(min(attempt - 1, 16))
let ceiling = min(
spawnRetryBackoffCapNanoseconds,
spawnRetryBackoffBaseNanoseconds << shift
)
// Equal jitter: half fixed, half random, to de-synchronize concurrent
// retriers without collapsing the delay toward zero.
let half = ceiling / 2
return half + UInt64.random(in: 0...half)
}

/// Runs `attempt`, retrying while `shouldRetryTransientFailure` returns
/// `true`, up to ``maxSpawnAttempts``, with bounded jittered backoff
/// between attempts.
///
/// `EAGAIN` from `clone3`/`fork`/`posix_spawn` means the kernel could not
/// create the task because a per-uid or system task-count limit was
/// momentarily reached. The condition is transient, so a bounded number of
/// retries is attempted before the failure is surfaced unchanged. Each
/// platform decides which failures are the clean, retryable fork-side case.
///
/// The backoff runs here, in the async context between attempts, never
/// inside the spawn helper: that helper holds a process-wide fork lock with
/// signals blocked and runs on a single shared worker thread, so sleeping
/// there would stall every other spawn. On cancellation during the backoff,
/// retrying stops and the pending outcome is surfaced rather than throwing a
/// cancellation error into the spawn path.
internal func runSpawnAttemptsRetryingTransientFailure<Outcome: Sendable>(
_ attempt: () async throws(SubprocessError) -> Outcome,
shouldRetryTransientFailure shouldRetry: (Outcome) -> Bool
) async throws(SubprocessError) -> Outcome {
var attemptNumber = 1
while true {
let outcome = try await attempt()
guard shouldRetry(outcome), attemptNumber < Self.maxSpawnAttempts else {
return outcome
}
do {
try await Task.sleep(
nanoseconds: Self.spawnRetryBackoffNanoseconds(forAttempt: attemptNumber)
)
} catch {
return outcome
}
attemptNumber += 1
}
}
#endif
}

internal enum StringOrRawBytes: Sendable, Hashable {
Expand Down
78 changes: 55 additions & 23 deletions Sources/Subprocess/Platforms/Subprocess+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -424,29 +424,11 @@ extension Configuration {
uidPtr: uidPtr,
gidPtr: gidPtr
)
let (spawnError, pid) = try await runOnBackgroundThread {
return possibleExecutablePath._withCString { exePath in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
var pid: pid_t = 0
var _fileActions = spawnContext.fileActions
var _spawnAttributes = spawnContext.spawnAttributes
let rc = _subprocess_spawn(
&pid,
exePath,
&_fileActions,
&_spawnAttributes,
spawnContext.argv,
spawnContext.env,
spawnContext.uidPtr,
spawnContext.gidPtr,
Int32(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
return (rc, pid)
}
}
}
let (spawnError, pid) = try await self.spawnRetryingTransientFailure(
executablePath: possibleExecutablePath,
spawnContext: spawnContext,
supplementaryGroups: supplementaryGroups
)
// Spawn error
if spawnError != 0 {
if [ENOENT, EACCES, ENOTDIR].contains(spawnError) {
Expand Down Expand Up @@ -510,6 +492,56 @@ extension Configuration {
)
}
}

/// Invokes `_subprocess_spawn` on the shared background worker thread,
/// retrying a transient fork-side `EAGAIN` with bounded, jittered backoff.
///
/// `posix_spawn` is used directly only when no pre-fork setup is required;
/// in that path an `EAGAIN` is the internal fork failing with no child
/// created; this is the clean, retryable case, and the existing error
/// handling already treats a non-prefork failure as leaving nothing to
/// reap. When pre-fork setup is required, a failure can carry a child, so
/// it is left for the caller to handle as before rather than retried.
private func spawnRetryingTransientFailure(
executablePath: String,
spawnContext: SpawnContext,
supplementaryGroups: [gid_t]?
) async throws(SubprocessError) -> (CInt, pid_t) {
let requiresPreFork =
self.platformOptions.userID != nil
|| self.platformOptions.groupID != nil
|| (supplementaryGroups?.count ?? 0) > 0
|| self.platformOptions.createSession
return try await self.runSpawnAttemptsRetryingTransientFailure {
() async throws(SubprocessError) -> (CInt, pid_t) in
return try await runOnBackgroundThread {
return executablePath._withCString { exePath in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
var pid: pid_t = 0
var _fileActions = spawnContext.fileActions
var _spawnAttributes = spawnContext.spawnAttributes
let rc = _subprocess_spawn(
&pid,
exePath,
&_fileActions,
&_spawnAttributes,
spawnContext.argv,
spawnContext.env,
spawnContext.uidPtr,
spawnContext.gidPtr,
Int32(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
return (rc, pid)
}
}
}
} shouldRetryTransientFailure: { outcome in
let (spawnError, _) = outcome
return !requiresPreFork && spawnError == EAGAIN
}
}
}

// MARK: - ProcessIdentifier
Expand Down
97 changes: 68 additions & 29 deletions Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -501,35 +501,12 @@ extension Configuration {
let spawnContext = SpawnContext(
argv: argv, env: env, uidPtr: uidPtr, gidPtr: gidPtr, processGroupIDPtr: processGroupIDPtr
)
let (pid, processDescriptor, spawnError) = try await runOnBackgroundThread { () throws(SubprocessError) in
return try possibleExecutablePath._withCString { exePath throws(SubprocessError) in
return try (self.workingDirectory?.string).withOptionalCString { workingDir in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
return fileDescriptors.withUnsafeBufferPointer { fds in
var pid: pid_t = 0
var processDescriptor: PlatformFileDescriptor = .invalidDescriptor

let rc = _subprocess_fork_exec(
&pid,
&processDescriptor,
exePath,
workingDir,
fds.baseAddress!,
spawnContext.argv,
spawnContext.env,
spawnContext.uidPtr,
spawnContext.gidPtr,
spawnContext.processGroupIDPtr,
CInt(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
return (pid, processDescriptor, rc)
}
}
}
}
}
let (pid, processDescriptor, spawnError) = try await self.forkExecRetryingTransientFailure(
executablePath: possibleExecutablePath,
fileDescriptors: fileDescriptors,
spawnContext: spawnContext,
supplementaryGroups: supplementaryGroups
)
// Spawn error
if spawnError != 0 {
if [ENOENT, EACCES, ENOTDIR].contains(spawnError) {
Expand Down Expand Up @@ -605,6 +582,68 @@ extension Configuration {
)
}
}

/// Invokes `_subprocess_fork_exec()` on the shared background worker thread,
/// retrying a transient fork-side `EAGAIN` with bounded, jittered backoff.
///
/// `EAGAIN` from `clone3()`/`fork()` means the kernel could not create the
/// task because a per-uid or system task-count limit was momentarily
/// reached. The condition is transient, so a bounded number of retries is
/// attempted before the error is surfaced unchanged.
///
/// Only a fork-side `EAGAIN` is retried, identified by the shim returning
/// without a process descriptor (`.invalidDescriptor`): the kernel created
/// nothing, so re-attempting is clean. An exec-side failure carries a
/// valid descriptor for a child the shim has already reaped and is left
/// for the caller to handle as before.
///
/// The backoff runs here, in the async context between worker-thread
/// invocations, not inside the shim. The shim holds a process-wide fork
/// lock with signals blocked, and the worker is a single shared executor,
/// so sleeping in either would stall every other spawn.
private func forkExecRetryingTransientFailure(
executablePath: String,
fileDescriptors: [CInt],
spawnContext: SpawnContext,
supplementaryGroups: [gid_t]?
) async throws(SubprocessError) -> (pid_t, PlatformFileDescriptor, CInt) {
return try await self.runSpawnAttemptsRetryingTransientFailure {
() async throws(SubprocessError) -> (pid_t, PlatformFileDescriptor, CInt) in
return try await runOnBackgroundThread { () throws(SubprocessError) in
return try executablePath._withCString { exePath throws(SubprocessError) in
return try (self.workingDirectory?.string).withOptionalCString { workingDir in
return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in
return fileDescriptors.withUnsafeBufferPointer { fds in
var pid: pid_t = 0
var processDescriptor: PlatformFileDescriptor = .invalidDescriptor
let rc = _subprocess_fork_exec(
&pid,
&processDescriptor,
exePath,
workingDir,
fds.baseAddress!,
spawnContext.argv,
spawnContext.env,
spawnContext.uidPtr,
spawnContext.gidPtr,
spawnContext.processGroupIDPtr,
CInt(supplementaryGroups?.count ?? 0),
sgroups?.baseAddress,
self.platformOptions.createSession ? 1 : 0
)
return (pid, processDescriptor, rc)
}
}
}
}
}
} shouldRetryTransientFailure: { outcome in
// Retry only a transient fork-side EAGAIN: `.invalidDescriptor` means
// the kernel created nothing, so re-attempting is clean.
let (_, processDescriptor, spawnError) = outcome
return spawnError == EAGAIN && processDescriptor == .invalidDescriptor
}
}
}

// MARK: - ProcessIdentifier
Expand Down
108 changes: 108 additions & 0 deletions Tests/SubprocessTests/IntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2673,6 +2673,85 @@ extension SubprocessIntegrationTests {
}
#endif // !os(Windows)

// MARK: - Spawn retry tests
#if !os(Windows)
extension SubprocessIntegrationTests {
@Test func retriesUntilSuccess() async throws {
let counter = CallCounter()
let failuresBeforeSuccess = 3
let configuration = Configuration(executable: .name("true"))

let outcome = try await configuration.runSpawnAttemptsRetryingTransientFailure {
() async throws(SubprocessError) -> FakeSpawnOutcome in
let attempt = counter.increment()
return attempt <= failuresBeforeSuccess ? .transientFailure : .success
} shouldRetryTransientFailure: { outcome in
outcome == .transientFailure
}

#expect(outcome == .success)
#expect(counter.count == failuresBeforeSuccess + 1)
}

@Test func surfacesFailureAtCap() async throws {
let counter = CallCounter()
let configuration = Configuration(executable: .name("true"))

let outcome = try await configuration.runSpawnAttemptsRetryingTransientFailure {
() async throws(SubprocessError) -> FakeSpawnOutcome in
counter.increment()
return .transientFailure
} shouldRetryTransientFailure: { outcome in
outcome == .transientFailure
}

#expect(outcome == .transientFailure)
#expect(counter.count == 6 /* Configuration.maxSpawnAttempts */)
}

@Test func noRetryWhenNotTransient() async throws {
let counter = CallCounter()
let configuration = Configuration(executable: .name("true"))

let outcome = try await configuration.runSpawnAttemptsRetryingTransientFailure {
() async throws(SubprocessError) -> FakeSpawnOutcome in
counter.increment()
return .success
} shouldRetryTransientFailure: { outcome in
outcome == .transientFailure
}

#expect(outcome == .success)
#expect(counter.count == 1)
}

@Test func cancellationStopsRetrying() async throws {
let counter = CallCounter()
let configuration = Configuration(executable: .name("true"))

// No `await` between creating the task and cancelling it, so the task
// body cannot start until this function suspends at `task.value`, by
// which point cancellation is already requested. The first backoff
// therefore throws `CancellationError`, which the driver turns into
// the pending outcome rather than propagating.
let task = Task {
try await configuration.runSpawnAttemptsRetryingTransientFailure {
() async throws(SubprocessError) -> FakeSpawnOutcome in
counter.increment()
return .transientFailure
} shouldRetryTransientFailure: { outcome in
outcome == .transientFailure
}
}
task.cancel()

let outcome = try await task.value
#expect(outcome == .transientFailure)
#expect(counter.count < 6 /* Configuration.maxSpawnAttempts */)
}
}
#endif // !os(Windows)

// MARK: - Other Tests
extension SubprocessIntegrationTests {
@Test func testTerminateProcess() async throws {
Expand Down Expand Up @@ -3808,3 +3887,32 @@ extension FileDescriptor {
return result
}
}

#if !os(Windows)
/// A thread-safe call counter. The retry driver invokes `attempt` sequentially,
/// but the cancellation test runs the driver inside a child `Task`, so the
/// counter must be safe to capture into a `@Sendable` closure.
private final class CallCounter: @unchecked Sendable {
private let lock = NSLock()
private var value = 0

@discardableResult
func increment() -> Int {
self.lock.lock()
defer { self.lock.unlock() }
self.value += 1
return self.value
}

var count: Int {
self.lock.lock()
defer { self.lock.unlock() }
return self.value
}
}

private enum FakeSpawnOutcome: Sendable, Equatable {
case transientFailure
case success
}
#endif // !os(Windows)
Loading