Skip to content

Migrate from lock-based synchronization to Observable/Rx patterns#655

Open
juliomenendez wants to merge 13 commits into
mainfrom
feature/observable-migration
Open

Migrate from lock-based synchronization to Observable/Rx patterns#655
juliomenendez wants to merge 13 commits into
mainfrom
feature/observable-migration

Conversation

@juliomenendez
Copy link
Copy Markdown

Summary

This PR migrates the Agents-for-net codebase from lock-based synchronization to Observable/Rx patterns (System.Reactive 6.0.0). The migration is designed as a series of atomic commits that maintain project stability.

Changes by Component

  • System.Reactive Infrastructure: Add System.Reactive 6.0.0 package and ObservableBridge utility for IAsyncEnumerable/IObservable interop
  • RouteList.cs: Replace obsolete ReaderWriterLock with BehaviorSubject<ImmutableList<T>> for lock-free route management
  • MemoryStorage.cs: Replace lock(_syncroot) with ConcurrentDictionary and fix _eTag++ race condition using Interlocked.Increment
  • MemoryTranscriptStore.cs: Replace nested Dictionary with ConcurrentDictionary<string, ConcurrentDictionary<string, ImmutableList<IActivity>>>
  • BackgroundTaskQueue.cs: Convert to Observable pattern using Subject<Unit> for signaling while maintaining queue semantics
  • ActivityTaskQueue.cs: Fix thread-safety bug (bool _stopped race condition) and convert to full Observable pattern with drain completion tracking
  • HostedTaskService + HostedActivityService: Replace ReaderWriterLockSlim with BehaviorSubject<bool> for graceful shutdown
  • StreamingResponse.cs: Complete rewrite replacing 7 lock(this) statements with BehaviorSubject<StreamState>, Timer with Observable.Interval, and proper Rx pipeline

Issues Fixed

  • ActivityTaskQueue.cs:24 - _stopped boolean race condition (now uses Interlocked)
  • RouteList.cs:12 - Obsolete ReaderWriterLock (now uses BehaviorSubject)
  • StreamingResponse.cs - Timer callbacks within lock blocks (potential deadlock)
  • MemoryStorage.cs:152 - ETag increment not thread-safe (now uses Interlocked)

Key Benefits

  • Eliminates lock contention with lock-free data structures
  • Clear state management via Rx BehaviorSubject
  • Built-in backpressure handling via Rx operators
  • Proper cancellation support via TakeUntil
  • No more lock(this) anti-pattern

Test plan

  • All existing unit tests pass (477+ tests)
  • Added concurrency tests for ActivityTaskQueue.Stop()
  • Verified builds for both net8.0 and netstandard2.0

🤖 Generated with Claude Code

juliomenendez and others added 7 commits January 21, 2026 11:25
- Add System.Reactive 6.0.0 to Directory.Packages.props
- Add System.Reactive package reference to Core, Builder, Hosting,
  Storage, and Transcript projects
- Add System.Reactive to test projects
- Create ObservableBridge.cs with IAsyncEnumerable/IObservable interop
  utilities for migration purposes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Replace ReaderWriterLock with BehaviorSubject<ImmutableList<RouteEntry>>
- Use atomic updates via immutable collections (no locks needed)
- Add System.Collections.Immutable package dependency
- Implement IDisposable pattern for proper resource cleanup

This eliminates the obsolete ReaderWriterLock synchronization primitive
in favor of a reactive approach using immutable collections.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Replace Dictionary<string, JsonObject> with ConcurrentDictionary
- Replace _eTag++ with Interlocked.Increment(ref _eTag)
- Remove lock statements using AddOrUpdate for atomic operations
- Fix thread-safety for ETag increment operation

This eliminates lock contention in MemoryStorage by using lock-free
concurrent data structures.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Replace nested Dictionary with ConcurrentDictionary<string,
  ConcurrentDictionary<string, ImmutableList<IActivity>>>
- Use GetOrAdd and AddOrUpdate for atomic operations
- Remove all lock statements
- Add System.Collections.Immutable for ImmutableList

This eliminates lock contention in MemoryTranscriptStore by using
lock-free concurrent data structures with immutable activity lists.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
BackgroundTaskQueue:
- Replace SemaphoreSlim with Subject<Unit> for signaling
- Maintain ConcurrentQueue for proper queue semantics
- Add IDisposable implementation

ActivityTaskQueue:
- Fix thread-safety bug: change bool _stopped to int with Interlocked
- Convert to full Observable pattern with Subject<Unit> signaling
- Add TaskCompletionSource for drain completion tracking
- Implement proper pending count tracking with Interlocked
- Add IDisposable implementation

Tests:
- Add Stop_ShouldPreventFurtherQueueing test
- Add Stop_ConcurrentAccess_ShouldBeThreadSafe test
- Add Stop_WithWaitForEmpty_ShouldWaitForQueueToDrain test

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
HostedTaskService:
- Replace ReaderWriterLockSlim with BehaviorSubject<bool> _isAcceptingWork
- Simplify shutdown logic using Rx state management
- Add proper disposal of BehaviorSubject

HostedActivityService:
- Replace ReaderWriterLockSlim with BehaviorSubject<bool> _isAcceptingWork
- Simplify shutdown logic using Rx state management
- Add proper disposal of BehaviorSubject

Both services now use reactive state management for graceful shutdown
instead of lock-based synchronization.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Introduce StreamState enum: Initial, Streaming, Ended, Canceled,
  UserCanceled, Error
- Replace 7 lock(this) statements with BehaviorSubject<StreamState>
- Replace Timer with Observable.Interval and TakeUntil
- Add proper Rx pipeline with SelectMany, TakeUntil for cancellation
- Implement IDisposable with CompositeDisposable
- Use Interlocked for thread-safe sequence numbering
- Add proper message locking for thread-safe message buffering

Key benefits:
- No more lock(this) anti-pattern
- Clear state enum instead of multiple boolean flags
- Built-in backpressure handling via Rx operators
- Proper cancellation via TakeUntil
- Eliminates potential deadlock from timer callbacks within locks

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings January 21, 2026 18:32
@juliomenendez juliomenendez requested a review from a team as a code owner January 21, 2026 18:32
@github-actions github-actions Bot added ML: Core Tags changes to core libraries ML: Tests Tags changes to tests ML: Packages labels Jan 21, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR migrates the Agents-for-net codebase from traditional lock-based synchronization to Observable/Rx patterns using System.Reactive 6.0.0. The migration aims to eliminate lock contention and improve concurrency handling across multiple components including storage layers, background task queues, and streaming response handling.

Changes:

  • Introduced System.Reactive 6.0.0 and ObservableBridge utility for Observable/IAsyncEnumerable interop
  • Replaced lock-based synchronization with ConcurrentDictionary, BehaviorSubject, and Interlocked operations
  • Migrated BackgroundTaskQueue and ActivityTaskQueue to use Subject-based signaling
  • Completely rewrote StreamingResponse with Rx pipeline replacing Timer and lock(this) patterns
  • Added comprehensive concurrency tests for ActivityTaskQueue

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 21 comments.

Show a summary per file
File Description
Directory.Packages.props Added System.Reactive 6.0.0 and System.Collections.Immutable 10.0.2 package versions
ObservableBridge.cs New utility for IAsyncEnumerable/IObservable conversion (has circular call bug)
MemoryStorage.cs Replaced lock with ConcurrentDictionary and Interlocked.Increment for thread-safe ETag generation
MemoryTranscriptStore.cs Migrated to ConcurrentDictionary with ImmutableList for lock-free transcript storage
BackgroundTaskQueue.cs Converted to Subject-based signaling (has race condition with multiple consumers)
ActivityTaskQueue.cs Fixed boolean race condition with Interlocked, converted to Observable pattern (has multiple issues)
HostedTaskService.cs Replaced ReaderWriterLockSlim with BehaviorSubject for shutdown coordination
HostedActivityService.cs Replaced ReaderWriterLockSlim with BehaviorSubject for shutdown coordination
RouteList.cs Replaced obsolete ReaderWriterLock with BehaviorSubject and ImmutableList (has compare-and-swap bug)
StreamingResponse.cs Complete rewrite with BehaviorSubject for state and Observable.Interval (has multiple race conditions)
ActivityTaskQueueTests.cs Added concurrency tests for Stop() method and thread safety
Multiple test projects Added System.Reactive package reference
Comments suppressed due to low confidence (1)

src/libraries/Storage/Microsoft.Agents.Storage.Transcript/MemoryTranscriptStore.cs:145

  • These 'if' statements can be combined.
            if (_channels.TryGetValue(channelId, out var channel))
            {
                if (channel.TryGetValue(conversationId, out var transcript))
                {
                    if (continuationToken != null)
                    {
                        pagedResult.Items = transcript
                            .OrderBy(a => a.Timestamp)
                            .Where(a => a.Timestamp >= startDate)
                            .SkipWhile(a => a.Id != continuationToken)
                            .Skip(1)
                            .Take(20)
                            .ToArray();

                        if (pagedResult.Items.Count == 20)
                        {
                            pagedResult.ContinuationToken = pagedResult.Items.Last().Id;
                        }
                    }
                    else
                    {
                        pagedResult.Items = transcript
                            .OrderBy(a => a.Timestamp)
                            .Where(a => a.Timestamp >= startDate)
                            .Take(20)
                            .ToArray();

                        if (pagedResult.Items.Count == 20)
                        {
                            pagedResult.ContinuationToken = pagedResult.Items.Last().Id;
                        }
                    }
                }
            }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/libraries/Builder/Microsoft.Agents.Builder/StreamingResponse.cs Outdated
Comment thread src/libraries/Builder/Microsoft.Agents.Builder/App/RouteList.cs
Comment thread src/libraries/Builder/Microsoft.Agents.Builder/StreamingResponse.cs
Comment thread src/libraries/Builder/Microsoft.Agents.Builder/StreamingResponse.cs
Comment thread src/libraries/Builder/Microsoft.Agents.Builder/StreamingResponse.cs
Comment thread src/libraries/Storage/Microsoft.Agents.Storage/MemoryStorage.cs Outdated
Comment thread src/libraries/Storage/Microsoft.Agents.Storage/MemoryStorage.cs
Replace the lock-based text chunk buffering in StreamingResponse with a
new DynamicIntervalTextChunkBatcher class that uses Rx operators (Scan,
CombineLatest, DistinctUntilChanged) for lock-free text accumulation
and interval-based emission.

- Use immutable string concatenation instead of StringBuilder for thread safety
- Mark state flags as volatile for proper memory visibility
- Increase test delays for CI reliability

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@juliomenendez juliomenendez force-pushed the feature/observable-migration branch from 69bd0cf to 8d017ef Compare January 21, 2026 19:27
juliomenendez and others added 5 commits January 21, 2026 12:49
… semantics

- Use passed dictionary reference directly instead of copying to ConcurrentDictionary
- Fix ETag to use post-increment semantics (start at 0, not 1)
- Maintain ConcurrentDictionary for thread-safe operations when no dictionary is passed

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Version 6.0.0 is not available on Azure DevOps NuGet feed, causing
NU1603 warnings that fail the build with warnings-as-errors enabled.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- MemoryStorage: Remove unused assignment from RemoveTypeInfoProperties()
- StreamingResponse: Use Select+Concat instead of SelectMany for sequential
  activity sending (prevents concurrent SendActivityAsync calls)
- StreamingResponse: Add comment explaining why initialInterval is not used
  with the batcher approach (emits immediately via CombineLatest+StartWith)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add retry loop to handle case where signal is received but another
consumer takes the item. Use Take(1) instead of FirstAsync for better
handling of subject completion. Add proper exception handling for
cancellation and subject completion scenarios.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@juliomenendez
Copy link
Copy Markdown
Author

@tracyboehrer this is the replacement for #542

@tracyboehrer
Copy link
Copy Markdown
Member

Can we limit this to StreamingResponse? For scope reduction, and we will likely be changing the background queue up separately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ML: Core Tags changes to core libraries ML: Packages ML: Tests Tags changes to tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants