Skip to content

Commit 429eb25

Browse files
Mpdreamzclaude
andauthored
Fix flaky tests by removing Task.Run race condition (#24)
* Fix flaky tests by removing Task.Run race condition - Remove Task.Run wrapping from CreateConsoleOutObservable() in both EventBasedObservableProcess and BufferedObservableProcess. The wrapping caused a race between Subscribe() returning and KickOff() starting. - Add parameterless WaitForExit() call in EventBasedObservableProcess to flush async events before OnExit (documented .NET pattern). - Handle AggregateException in WaitForEndOfStreams when tasks are cancelled during process exit, preventing test host crashes. - Guard TestConsoleOutWriter against InvalidOperationException when xUnit's TestOutputHelper is accessed after test ends. - Mark NoWrapInThread as obsolete since Task.Run wrapping is removed. - Disable xUnit test parallelism to avoid timing issues with process tests. - Add parallel execution tests verifying ObservableProcess, EventBasedObservableProcess, Proc.Start, and Proc.StartRedirected all work correctly when running multiple processes concurrently. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * cleanup --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent ab32d2b commit 429eb25

11 files changed

Lines changed: 303 additions & 39 deletions

src/Proc.Fs/Bindings.fs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ let private startArgs (opts: ExecOptions) =
3939
opts.LineOutFilter |> Option.iter(fun f -> startArguments.LineOutFilter <- f)
4040
opts.Environment |> Option.iter(fun e -> startArguments.Environment <- e)
4141
opts.WorkingDirectory |> Option.iter(fun d -> startArguments.WorkingDirectory <- d)
42-
opts.NoWrapInThread |> Option.iter(fun b -> startArguments.NoWrapInThread <- b)
4342
opts.SendControlCFirst |> Option.iter(fun b -> startArguments.SendControlCFirst <- b)
4443
opts.WaitForStreamReadersTimeout |> Option.iter(fun t -> startArguments.WaitForStreamReadersTimeout <- t)
4544
startArguments.Timeout <- opts.Timeout
@@ -60,7 +59,6 @@ let private longRunningArguments (opts: ExecOptions) =
6059
opts.LineOutFilter |> Option.iter(fun f -> longRunningArguments.LineOutFilter <- f)
6160
opts.Environment |> Option.iter(fun e -> longRunningArguments.Environment <- e)
6261
opts.WorkingDirectory |> Option.iter(fun d -> longRunningArguments.WorkingDirectory <- d)
63-
opts.NoWrapInThread |> Option.iter(fun b -> longRunningArguments.NoWrapInThread <- b)
6462
opts.SendControlCFirst |> Option.iter(fun b -> longRunningArguments.SendControlCFirst <- b)
6563
opts.WaitForStreamReadersTimeout |> Option.iter(fun t -> longRunningArguments.WaitForStreamReadersTimeout <- t)
6664

src/Proc/BufferedObservableProcess.cs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,8 @@ public BufferedObservableProcess(StartArguments startArguments) : base(startArgu
4444
private Task _stdErrSubscription;
4545
private IObserver<CharactersOut> _observer;
4646

47-
protected override IObservable<CharactersOut> CreateConsoleOutObservable()
48-
{
49-
if (NoWrapInThread)
50-
return Observable.Create<CharactersOut>(observer =>
51-
{
52-
var disposable = KickOff(observer);
53-
return disposable;
54-
});
55-
56-
return Observable.Create<CharactersOut>(async observer =>
57-
{
58-
var disposable = await Task.Run(() => KickOff(observer));
59-
return disposable;
60-
});
61-
}
47+
protected override IObservable<CharactersOut> CreateConsoleOutObservable() =>
48+
Observable.Create<CharactersOut>(observer => KickOff(observer));
6249

6350
/// <summary>
6451
/// Expert setting, subclasses can return true if a certain condition is met to break out of the async readers on StandardOut and StandardError
@@ -156,11 +143,23 @@ private void WaitForEndOfStreams(IObserver<CharactersOut> observer, Task stdOutS
156143
{
157144
CancelAsyncReads();
158145
}
159-
else if (!Task.WaitAll(new[] {stdOutSubscription, stdErrSubscription}, WaitForStreamReadersTimeout.Value))
146+
else
160147
{
161-
CancelAsyncReads();
162-
OnBeforeWaitForEndOfStreamsError(WaitForStreamReadersTimeout.Value);
163-
OnError(observer, new WaitForEndOfStreamsTimeoutException(WaitForStreamReadersTimeout.Value));
148+
try
149+
{
150+
if (!Task.WaitAll([stdOutSubscription, stdErrSubscription], WaitForStreamReadersTimeout.Value))
151+
{
152+
CancelAsyncReads();
153+
OnBeforeWaitForEndOfStreamsError(WaitForStreamReadersTimeout.Value);
154+
OnError(observer, new WaitForEndOfStreamsTimeoutException(WaitForStreamReadersTimeout.Value));
155+
}
156+
}
157+
catch (AggregateException)
158+
{
159+
// Tasks may be cancelled or faulted when the process exits abruptly
160+
// or when the test framework disposes resources. This is expected.
161+
CancelAsyncReads();
162+
}
164163
}
165164
}
166165
}

src/Proc/EventBasedObservableProcess.cs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Reactive;
44
using System.Reactive.Disposables;
55
using System.Reactive.Linq;
6-
using System.Threading.Tasks;
76
using ProcNet.Extensions;
87
using ProcNet.Std;
98

@@ -20,17 +19,8 @@ public EventBasedObservableProcess(string binary, params string[] arguments) : b
2019

2120
public EventBasedObservableProcess(StartArguments startArguments) : base(startArguments) { }
2221

23-
protected override IObservable<LineOut> CreateConsoleOutObservable()
24-
{
25-
if (NoWrapInThread)
26-
return Observable.Create<LineOut>(observer => KickOff(observer));
27-
28-
return Observable.Create<LineOut>(async observer =>
29-
{
30-
var disposable = await Task.Run(() => KickOff(observer));
31-
return disposable;
32-
});
33-
}
22+
protected override IObservable<LineOut> CreateConsoleOutObservable() =>
23+
Observable.Create<LineOut>(observer => KickOff(observer));
3424

3525
private CompositeDisposable KickOff(IObserver<LineOut> observer)
3626
{
@@ -54,6 +44,20 @@ private CompositeDisposable KickOff(IObserver<LineOut> observer)
5444
}
5545

5646
private IDisposable CreateProcessExitSubscription(IObservable<EventPattern<object>> processExited, IObserver<LineOut> observer) =>
57-
processExited.Subscribe(args => { OnExit(observer); }, e => OnError(observer, e), ()=> OnCompleted(observer));
47+
processExited.Subscribe(args =>
48+
{
49+
// Second WaitForExit() call (parameterless) ensures all async events are flushed
50+
// before we proceed with the exit handling. This is the documented .NET pattern
51+
// for Process event handling - must be called BEFORE the process is disposed.
52+
try
53+
{
54+
Process?.WaitForExit();
55+
}
56+
catch (InvalidOperationException)
57+
{
58+
// Process already disposed
59+
}
60+
OnExit(observer);
61+
}, e => OnError(observer, e), ()=> OnCompleted(observer));
5862
}
5963
}

src/Proc/ObservableProcessBase.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ protected ObservableProcessBase(StartArguments startArguments)
4444
protected bool Started { get; set; }
4545
protected string ProcessName { get; private set; }
4646

47+
[Obsolete("Task.Run wrapping has been removed. This property has no effect.")]
4748
protected bool NoWrapInThread => StartArguments.NoWrapInThread;
4849
private int? _processId;
4950
public virtual int? ProcessId => _processId;

src/Proc/StartArguments.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public StartArguments(string binary, params string[] args) : base(binary, args)
2323
/// stop at the same time they are all queueing for <see cref="System.Diagnostics.Process.WaitForCompletion()"> which may lead to
2424
/// unexpected behaviour
2525
/// </summary>
26+
[Obsolete("Task.Run wrapping has been removed. This property has no effect.")]
2627
public bool NoWrapInThread { get; set; }
2728

2829
/// <summary>

tests/Proc.Tests/DisposeTestCases.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public void DelayedWriterRunsToCompletion()
1414
Exception ex = null;
1515
var process = new ObservableProcess(TestCaseArguments(nameof(DelayedWriter)));
1616
var subscription = process.SubscribeLines(c=>seen.Add(c.Line), e => ex = e);
17-
process.WaitForCompletion(WaitTimeout);
17+
process.WaitForCompletion(TimeSpan.FromSeconds(10));
1818

1919
process.ExitCode.Should().Be(20);
2020
ex.Should().BeNull();
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
using FluentAssertions;
7+
using ProcNet.Std;
8+
using Xunit;
9+
10+
namespace ProcNet.Tests
11+
{
12+
/// <summary>
13+
/// Tests that verify processes can be started and run in parallel without race conditions.
14+
/// These tests ensure the removal of Task.Run wrapping doesn't break parallel execution.
15+
/// </summary>
16+
public class ParallelExecutionTests : TestsBase
17+
{
18+
private const int ParallelCount = 3;
19+
private const string SingleLineTestCase = "SingleLine";
20+
private const string SlowOutputTestCase = "SlowOutput";
21+
22+
// Longer timeout for parallel tests - processes compete for resources
23+
private static readonly TimeSpan ParallelTimeout = TimeSpan.FromSeconds(30);
24+
25+
[Fact]
26+
public async Task ObservableProcess_CanRunInParallel()
27+
{
28+
var results = new ConcurrentBag<(int index, List<string> lines, int? exitCode)>();
29+
30+
var tasks = Enumerable.Range(0, ParallelCount).Select(async i =>
31+
{
32+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
33+
var seen = new List<string>();
34+
var process = new ObservableProcess(TestCaseArguments(SingleLineTestCase));
35+
process.SubscribeLines(c => seen.Add(c.Line));
36+
process.WaitForCompletion(ParallelTimeout);
37+
results.Add((i, seen, process.ExitCode));
38+
});
39+
40+
await Task.WhenAll(tasks);
41+
42+
results.Should().HaveCount(ParallelCount);
43+
foreach (var (index, lines, exitCode) in results)
44+
{
45+
exitCode.Should().Be(0, $"process {index} should exit with code 0");
46+
lines.Should().ContainSingle().Which.Should().Be(SingleLineTestCase);
47+
}
48+
}
49+
50+
[Fact]
51+
public async Task EventBasedObservableProcess_CanRunInParallel()
52+
{
53+
var results = new ConcurrentBag<(int index, List<string> lines, int? exitCode)>();
54+
55+
var tasks = Enumerable.Range(0, ParallelCount).Select(async i =>
56+
{
57+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
58+
var seen = new List<string>();
59+
var process = new EventBasedObservableProcess(TestCaseArguments(SingleLineTestCase));
60+
process.Subscribe(c => seen.Add(c.Line));
61+
process.WaitForCompletion(ParallelTimeout);
62+
results.Add((i, seen, process.ExitCode));
63+
});
64+
65+
await Task.WhenAll(tasks);
66+
67+
results.Should().HaveCount(ParallelCount);
68+
foreach (var (index, lines, exitCode) in results)
69+
{
70+
exitCode.Should().Be(0, $"process {index} should exit with code 0");
71+
lines.Should().ContainSingle().Which.Should().Be(SingleLineTestCase);
72+
}
73+
}
74+
75+
[Fact]
76+
public async Task ProcStart_CanRunInParallel()
77+
{
78+
var results = new ConcurrentBag<ProcessCaptureResult>();
79+
80+
var tasks = Enumerable.Range(0, ParallelCount).Select(async i =>
81+
{
82+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
83+
var args = TestCaseArguments(SingleLineTestCase);
84+
args.Timeout = ParallelTimeout;
85+
args.ConsoleOutWriter = new NoopConsoleOutWriter();
86+
var result = Proc.Start(args);
87+
results.Add(result);
88+
});
89+
90+
await Task.WhenAll(tasks);
91+
92+
results.Should().HaveCount(ParallelCount);
93+
foreach (var result in results)
94+
{
95+
result.Completed.Should().BeTrue();
96+
result.ExitCode.Should().Be(0);
97+
result.ConsoleOut.Should().ContainSingle().Which.Line.Should().Be(SingleLineTestCase);
98+
}
99+
}
100+
101+
[Fact]
102+
public async Task ProcStartRedirected_CanRunInParallel()
103+
{
104+
var results = new ConcurrentBag<(ProcessResult result, List<string> lines)>();
105+
106+
var tasks = Enumerable.Range(0, ParallelCount).Select(async i =>
107+
{
108+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
109+
var lines = new List<string>();
110+
var handler = new CollectingLineHandler(lines);
111+
var args = TestCaseArguments(SingleLineTestCase);
112+
args.Timeout = ParallelTimeout;
113+
var result = Proc.StartRedirected(args, handler);
114+
results.Add((result, lines));
115+
});
116+
117+
await Task.WhenAll(tasks);
118+
119+
results.Should().HaveCount(ParallelCount);
120+
foreach (var (result, lines) in results)
121+
{
122+
result.Completed.Should().BeTrue();
123+
result.ExitCode.Should().Be(0);
124+
lines.Should().ContainSingle().Which.Should().Be(SingleLineTestCase);
125+
}
126+
}
127+
128+
[Fact]
129+
public async Task MixedProcessTypes_CanRunInParallel()
130+
{
131+
var observableResults = new ConcurrentBag<int?>();
132+
var eventBasedResults = new ConcurrentBag<int?>();
133+
var procStartResults = new ConcurrentBag<int?>();
134+
135+
var tasks = new List<Task>();
136+
137+
// Start ObservableProcess instances
138+
tasks.AddRange(Enumerable.Range(0, ParallelCount).Select(async i =>
139+
{
140+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
141+
var process = new ObservableProcess(TestCaseArguments(SingleLineTestCase));
142+
process.SubscribeLines(_ => { });
143+
process.WaitForCompletion(ParallelTimeout);
144+
observableResults.Add(process.ExitCode);
145+
}));
146+
147+
// Start EventBasedObservableProcess instances
148+
tasks.AddRange(Enumerable.Range(0, ParallelCount).Select(async i =>
149+
{
150+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
151+
var process = new EventBasedObservableProcess(TestCaseArguments(SingleLineTestCase));
152+
process.Subscribe(_ => { });
153+
process.WaitForCompletion(ParallelTimeout);
154+
eventBasedResults.Add(process.ExitCode);
155+
}));
156+
157+
// Start Proc.Start instances
158+
tasks.AddRange(Enumerable.Range(0, ParallelCount).Select(async i =>
159+
{
160+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
161+
var args = TestCaseArguments(SingleLineTestCase);
162+
args.Timeout = ParallelTimeout;
163+
args.ConsoleOutWriter = new NoopConsoleOutWriter();
164+
var result = Proc.Start(args);
165+
procStartResults.Add(result.ExitCode);
166+
}));
167+
168+
await Task.WhenAll(tasks);
169+
170+
observableResults.Should().HaveCount(ParallelCount).And.OnlyContain(x => x == 0);
171+
eventBasedResults.Should().HaveCount(ParallelCount).And.OnlyContain(x => x == 0);
172+
procStartResults.Should().HaveCount(ParallelCount).And.OnlyContain(x => x == 0);
173+
}
174+
175+
[Fact]
176+
public async Task ObservableProcess_WithOutput_CanRunInParallel()
177+
{
178+
// SlowOutput takes ~5s (10 lines * 500ms delay). With 5 parallel processes, allow extra time.
179+
var slowOutputTimeout = TimeSpan.FromSeconds(30);
180+
var results = new ConcurrentBag<(int index, List<string> lines)>();
181+
182+
var tasks = Enumerable.Range(0, ParallelCount).Select(async i =>
183+
{
184+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
185+
var seen = new List<string>();
186+
var args = TestCaseArguments(SlowOutputTestCase);
187+
args.Timeout = slowOutputTimeout;
188+
var process = new ObservableProcess(args);
189+
process.SubscribeLines(c => seen.Add(c.Line));
190+
process.WaitForCompletion(slowOutputTimeout);
191+
results.Add((i, seen));
192+
});
193+
194+
await Task.WhenAll(tasks);
195+
196+
results.Should().HaveCount(ParallelCount);
197+
foreach (var (index, lines) in results)
198+
{
199+
// SlowOutput produces 10 lines: "x:1" through "x:10"
200+
lines.Should().HaveCount(10, $"process {index} should produce 10 lines");
201+
lines.Should().Contain("x:1");
202+
lines.Should().Contain("x:10");
203+
}
204+
}
205+
206+
[Fact]
207+
public async Task EventBasedObservableProcess_WithOutput_CanRunInParallel()
208+
{
209+
// SlowOutput takes ~5s (10 lines * 500ms delay). With 5 parallel processes, allow extra time.
210+
var slowOutputTimeout = TimeSpan.FromSeconds(30);
211+
var results = new ConcurrentBag<(int index, List<string> lines)>();
212+
213+
var tasks = Enumerable.Range(0, ParallelCount).Select(async i =>
214+
{
215+
await Task.Delay(i * 50); // Stagger starts to avoid thundering herd
216+
var seen = new List<string>();
217+
var args = TestCaseArguments(SlowOutputTestCase);
218+
args.Timeout = slowOutputTimeout;
219+
var process = new EventBasedObservableProcess(args);
220+
process.Subscribe(c => seen.Add(c.Line));
221+
process.WaitForCompletion(slowOutputTimeout);
222+
results.Add((i, seen));
223+
});
224+
225+
await Task.WhenAll(tasks);
226+
227+
results.Should().HaveCount(ParallelCount);
228+
foreach (var (index, lines) in results)
229+
{
230+
lines.Should().HaveCount(10, $"process {index} should produce 10 lines");
231+
lines.Should().Contain("x:1");
232+
lines.Should().Contain("x:10");
233+
}
234+
}
235+
236+
private class NoopConsoleOutWriter : IConsoleOutWriter
237+
{
238+
public void Write(Exception e) { }
239+
public void Write(ConsoleOut consoleOut) { }
240+
}
241+
242+
private class CollectingLineHandler : IConsoleLineHandler
243+
{
244+
private readonly List<string> _lines;
245+
246+
public CollectingLineHandler(List<string> lines) => _lines = lines;
247+
248+
public void Handle(LineOut lineOut) => _lines.Add(lineOut.Line);
249+
public void Handle(Exception e) => throw e;
250+
}
251+
}
252+
}

0 commit comments

Comments
 (0)