Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ let result = try await run(
}
```

The closure-based `run` returns an `ExecutionResult`. Access the closure's return value with `result.closureOutput`, and the termination status with `result.terminationStatus`.
The closure-based `run` returns an `ExecutionResult`. Access the closure's return value with `result.closureResult`, and the termination status with `result.terminationStatus`.

Because `input`, `output`, and `error` are separate parameters, you can mix streaming and capturing in the same call. For example, stream standard output from the closure while collecting standard error as a string, and return the closure's own value through `closureOutput`:
Because `input`, `output`, and `error` are separate parameters, you can mix streaming and capturing in the same call. For example, stream standard output from the closure while collecting standard error as a string, and return the closure's own value through `closureResult`:

```swift
let result = try await run(
Expand All @@ -140,7 +140,7 @@ let result = try await run(
return lineCount
}

print(result.closureOutput) // The line count returned from the closure.
print(result.closureResult) // The line count returned from the closure.
print(result.standardError ?? "") // The captured standard error.
```

Expand Down
56 changes: 37 additions & 19 deletions Sources/Subprocess/API.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public func run<
/// - Returns: An ``ExecutionResult`` that contains the closure's return value and
/// the termination status of the child process.
public func run<
Result,
Result: ~Copyable,
Input: InputProtocol,
Output: OutputProtocol,
Error: ErrorOutputProtocol
Expand Down Expand Up @@ -238,7 +238,7 @@ public func run<
/// - Returns: An ``ExecutionResult`` that contains the closure's return value and
/// the termination status of the child process.
public func run<
Result,
Result: ~Copyable,
Input: InputProtocol,
Output: OutputProtocol,
Error: ErrorOutputProtocol
Expand All @@ -251,16 +251,9 @@ public func run<
Execution<Input, Output, Error>
) async throws -> Result
) async throws -> ExecutionResult<Result, Output, Error> {
typealias RunResult = (
processIdentifier: ProcessIdentifier,
closureResult: Result,
output: Output.OutputType,
error: Error.OutputType
)

let outputPipe = try output.createPipe()
let errorPipe = try error.createPipe(from: outputPipe)
let result: ExecutionOutcome<RunResult> = try await configuration.run(
let outcome: ExecutionOutcome<_RunOutcome<Result, Output, Error>> = try await configuration.run(
input: try input.createPipe(),
as: Input.self,
output: outputPipe,
Expand All @@ -272,7 +265,14 @@ public func run<
var outputIOBox = consume outputIO
var errorIOBox = consume errorIO

return try await withThrowingTaskGroup(of: _RunGroupResult<Output, Error>.self) { group in
// The body's (possibly noncopyable) result is moved out through this
// box. The task group returns only the captured output and error,
// because a noncopyable value can't be a `GroupResult`.
var resultBox: Result? = nil
let captured: (Output.OutputType, Error.OutputType) = try await withThrowingTaskGroup(
of: _RunGroupResult<Output, Error>.self,
returning: (Output.OutputType, Error.OutputType).self
) { group in
var writer: StandardInputWriter?
if inputIOBox != nil {
let inputWriter = StandardInputWriter(
Expand Down Expand Up @@ -340,9 +340,8 @@ public func run<
outputStream: outputSequence,
errorStream: errorSequence
)
let result: Result
do {
result = try await body(execution)
resultBox = try await body(execution)
} catch {
if Input.self == CustomWriteInput.self {
try await writer?.finish()
Expand Down Expand Up @@ -371,16 +370,24 @@ public func run<
if Error.OutputType.self == Void.self {
capturedError = (() as Any) as? Error.OutputType
}
return (processIdentifier, result, capturedOutput!, capturedError!)
return (capturedOutput!, capturedError!)
}
return _RunOutcome(
processIdentifier: processIdentifier,
closureResult: resultBox.take()!,
output: captured.0,
error: captured.1
)
}

let terminationStatus = outcome.terminationStatus
let capturedResult = outcome.value
return ExecutionResult(
processIdentifier: result.value.processIdentifier,
terminationStatus: result.terminationStatus,
closureOutput: result.value.closureResult,
standardOutput: result.value.output,
standardError: result.value.error
processIdentifier: capturedResult.processIdentifier,
terminationStatus: terminationStatus,
closureResult: capturedResult.closureResult,
standardOutput: capturedResult.output,
standardError: capturedResult.error
)
}

Expand All @@ -389,3 +396,14 @@ private enum _RunGroupResult<Output: OutputProtocol, Error: OutputProtocol> {
case standardErrorCaptured(Error.OutputType)
case inputWritten
}

private struct _RunOutcome<
ClosureResult: Sendable & ~Copyable,
Output: OutputProtocol,
Error: OutputProtocol
>: ~Copyable, Sendable {
let processIdentifier: ProcessIdentifier
let closureResult: ClosureResult
let output: Output.OutputType
let error: Error.OutputType
}
39 changes: 25 additions & 14 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public struct Configuration: Sendable {
self.platformOptions = platformOptions
}

internal func run<Result, Input: InputProtocol, Output: OutputProtocol, Error: OutputProtocol>(
internal func run<
Result: ~Copyable & Sendable,
Input: InputProtocol,
Output: OutputProtocol,
Error: OutputProtocol
>(
input: consuming CreatedPipe,
as inputType: Input.Type,
output: consuming CreatedPipe,
Expand Down Expand Up @@ -108,9 +113,13 @@ public struct Configuration: Sendable {
// already finished cleanly and no cancellation is needed.
let taskFinishFlag = AtomicCounter()

let (result, monitorError) = try await withThrowingTaskGroup(
// The body's result is moved out through this box rather than
// returned from the task group since a noncopyable `Result` can't be a
// `GroupResult`. The group returns only the (copyable) monitor error.
var resultBox: Swift.Result<Result, any Swift.Error>? = nil
let monitorError = try await withThrowingTaskGroup(
of: SubprocessError?.self,
returning: (Swift.Result<Result, any Swift.Error>, SubprocessError?).self
returning: SubprocessError?.self
) { group in
group.addTask {
do throws(SubprocessError) {
Expand All @@ -132,12 +141,11 @@ public struct Configuration: Sendable {
let outputIO = spawnResults.outputReadEnd()
let errorIO = spawnResults.errorReadEnd()

let result: Swift.Result<Result, any Swift.Error>
do {
// Body runs in the same isolation.
let bodyResult = try await body(processIdentifier, inputIO, outputIO, errorIO)
taskFinishFlag.addOne()
result = .success(bodyResult)
resultBox = .success(bodyResult)
} catch {
let execution = Execution<Input, Output, Error>(
processIdentifier: processIdentifier,
Expand All @@ -147,12 +155,11 @@ public struct Configuration: Sendable {
)
// Attempt to terminate the child process when the body throws
await execution.teardown(using: self.platformOptions.teardownSequence)
result = .failure(error)
resultBox = .failure(error)
}

// Wait for the monitor child task to finish.
let monitorError = try await group.next() ?? nil
return (result, monitorError)
return try await group.next() ?? nil
}

// Drop the cancellation marker before reaping the zombie. After
Expand All @@ -170,7 +177,7 @@ public struct Configuration: Sendable {

return try ExecutionOutcome(
terminationStatus: terminationStatus,
value: result.get()
value: resultBox.take()!.get()
)
} onCleanup: {
let execution = Execution<Input, Output, Error>(
Expand Down Expand Up @@ -1171,14 +1178,18 @@ extension Optional where Wrapped == String {
/// In the latter case, `onCleanup` may be run concurrently with `body`.
/// The `body` closure is guaranteed to run exactly once.
/// The `onCleanup` closure is guaranteed to run only once, or not at all.
internal func withAsyncTaskCleanupHandler<Result: Sendable>(
internal func withAsyncTaskCleanupHandler<Result: ~Copyable & Sendable>(
_ body: () async throws -> Result,
onCleanup handler: @Sendable @escaping () async -> Void,
) async throws -> Result {
let (runCancellationHandlerStream, runCancellationHandlerContinuation) = AsyncThrowingStream.makeStream(of: Void.self)
return try await withThrowingTaskGroup(
// The body's result is moved out through this box rather than returned
// from the task group: a noncopyable `Result` can't be a `GroupResult`,
// which must be `Copyable`. The group itself returns `Void`.
var resultBox: Result? = nil
try await withThrowingTaskGroup(
of: Void.self,
returning: Result.self
returning: Void.self
) { group in
group.addTask {
// Keep this task sleep indefinitely until the parent task is cancelled.
Expand Down Expand Up @@ -1213,14 +1224,14 @@ internal func withAsyncTaskCleanupHandler<Result: Sendable>(
}

do {
let result = try await body()
resultBox = try await body()
runCancellationHandlerContinuation.finish()
return result
} catch {
runCancellationHandlerContinuation.finish(throwing: error)
throw error
}
}
return resultBox.take()!
}

internal struct _OrderedSet<Element: Hashable & Sendable>: Hashable, Sendable {
Expand Down
31 changes: 21 additions & 10 deletions Sources/Subprocess/Result.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ import SystemPackage
///
/// The `ClosureResult` generic parameter is `Void` when you call a `run(...)`
/// overload that doesn't take a `body` closure. It's the closure's return type
/// otherwise. You access the closure's return value with ``closureOutput``.
/// otherwise. You access the closure's return value with ``closureResult``.
///
/// The ``standardOutput`` and ``standardError`` properties are available when
/// the corresponding output type produces a non-`Void` value. They're
/// unavailable for output types such as ``DiscardedOutput``, ``SequenceOutput``,
/// and ``FileDescriptorOutput``.
public struct ExecutionResult<
ClosureResult: Sendable,
ClosureResult: Sendable & ~Copyable,
Output: OutputProtocol,
Error: OutputProtocol
>: Sendable {
>: Sendable, ~Copyable {
/// The process identifier of the subprocess.
public let processIdentifier: ProcessIdentifier
/// The termination status of the subprocess.
Expand All @@ -44,25 +44,34 @@ public struct ExecutionResult<
public let standardError: Error.OutputType

/// The value returned by the body closure passed to `run`.
public let closureOutput: ClosureResult
public let closureResult: ClosureResult

internal init(
processIdentifier: ProcessIdentifier,
terminationStatus: TerminationStatus,
closureOutput: ClosureResult,
closureResult: consuming ClosureResult,
standardOutput: Output.OutputType,
standardError: Error.OutputType
) {
self.processIdentifier = processIdentifier
self.terminationStatus = terminationStatus
self.closureOutput = closureOutput
self.closureResult = closureResult
self.standardOutput = standardOutput
self.standardError = standardError
}
}

extension ExecutionResult where ClosureResult: ~Copyable {
/// Consumes this result and returns the value produced by the `run` body closure.
public consuming func takeClosureResult() -> ClosureResult {
return self.closureResult
}
}

// MARK: - ExecutionResult Conformances

extension ExecutionResult: Copyable where ClosureResult: Copyable {}

extension ExecutionResult: Equatable where Output.OutputType: Equatable, Error.OutputType: Equatable, ClosureResult: Equatable {}

extension ExecutionResult: Hashable where Output.OutputType: Hashable, Error.OutputType: Hashable, ClosureResult: Hashable {}
Expand All @@ -74,7 +83,7 @@ extension ExecutionResult: CustomStringConvertible where Output.OutputType: Cust
ExecutionResult(
processIdentifier: \(self.processIdentifier),
terminationStatus: \(self.terminationStatus.description),
closureOutput: \(String(describing: self.closureOutput)),
closureResult: \(String(describing: self.closureResult)),
standardOutput: \(self.standardOutput.description)
standardError: \(self.standardError.description)
)
Expand All @@ -90,7 +99,7 @@ where Output.OutputType: CustomDebugStringConvertible, Error.OutputType: CustomD
ExecutionResult(
processIdentifier: \(self.processIdentifier),
terminationStatus: \(self.terminationStatus.debugDescription),
closureOutput: \(String(describing: self.closureOutput)),
closureResult: \(String(describing: self.closureResult)),
standardOutput: \(self.standardOutput.debugDescription)
standardError: \(self.standardError.debugDescription)
)
Expand All @@ -102,18 +111,20 @@ where Output.OutputType: CustomDebugStringConvertible, Error.OutputType: CustomD

/// The outcome of a subprocess execution, containing the closure's return
/// value and the termination status of the child process.
internal struct ExecutionOutcome<Result: Sendable>: Sendable {
internal struct ExecutionOutcome<Result: Sendable & ~Copyable>: Sendable, ~Copyable {
/// The termination status of the child process.
internal let terminationStatus: TerminationStatus
/// The value returned by the closure passed to the `run` method.
internal let value: Result

internal init(terminationStatus: TerminationStatus, value: Result) {
internal init(terminationStatus: TerminationStatus, value: consuming Result) {
self.terminationStatus = terminationStatus
self.value = value
}
}

extension ExecutionOutcome: Copyable where Result: Copyable {}

extension ExecutionOutcome: Equatable where Result: Equatable {}

extension ExecutionOutcome: Hashable where Result: Hashable {}
Expand Down
Loading
Loading