From 1f43b0156a553423ef7c8e65427373d14af7389b Mon Sep 17 00:00:00 2001 From: Matthias Howell Date: Thu, 11 Jun 2026 09:52:01 -0400 Subject: [PATCH] Add ValkeyStreamBuffer for resumable streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds ValkeyStreamBuffer — a resumable streaming buffer backed by Valkey Streams (XADD/XRANGE). Enables clients to disconnect and reconnect mid-stream without data loss using entry ID-based continuation tokens. - ValkeyStreamBuffer with AppendAsync, ReadAsync, GetEntryCountAsync, DeleteStreamAsync - ValkeyStreamBufferOptions (KeyPrefix, MaxLength, JsonSerializerOptions) - Uses Valkey.Glide with type-safe JSON serialization - Unit tests with mocked client - Sample demonstrating append, disconnect, and resume workflow Signed-off-by: Matthias Howell --- dotnet/agent-framework-dotnet.slnx | 1 + ...Step06_ResumableStreamingWithValkey.csproj | 15 + .../Program.cs | 116 +++++++ .../README.md | 64 ++++ .../ValkeyStreamBuffer.cs | 210 ++++++++++++ .../ValkeyStreamBufferOptions.cs | 28 ++ .../ValkeyStreamBufferTests.cs | 308 ++++++++++++++++++ 7 files changed, 742 insertions(+) create mode 100644 dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/AgentWithMemory_Step06_ResumableStreamingWithValkey.csproj create mode 100644 dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/Program.cs create mode 100644 dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/README.md create mode 100644 dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBuffer.cs create mode 100644 dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBufferOptions.cs create mode 100644 dotnet/tests/Microsoft.Agents.AI.Valkey.UnitTests/ValkeyStreamBufferTests.cs diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index f5bcc57b733..1f2f92132ca 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -196,6 +196,7 @@ + diff --git a/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/AgentWithMemory_Step06_ResumableStreamingWithValkey.csproj b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/AgentWithMemory_Step06_ResumableStreamingWithValkey.csproj new file mode 100644 index 00000000000..efb1985d547 --- /dev/null +++ b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/AgentWithMemory_Step06_ResumableStreamingWithValkey.csproj @@ -0,0 +1,15 @@ + + + + Exe + net10.0 + + enable + enable + + + + + + + diff --git a/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/Program.cs b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/Program.cs new file mode 100644 index 00000000000..718f34dd75f --- /dev/null +++ b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/Program.cs @@ -0,0 +1,116 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates resumable streaming using ValkeyStreamBuffer. +// No LLM is required — it simulates agent response chunks to show the +// append, disconnect, and resume workflow backed by Valkey Streams. +// +// Prerequisites: +// - Any Valkey or Redis OSS server (no modules required): +// docker run -d --name valkey -p 6379:6379 valkey/valkey:latest + +using Microsoft.Agents.AI; +using Microsoft.Agents.AI.Valkey; +using Microsoft.Extensions.AI; +using Valkey.Glide; + +var valkeyConnection = Environment.GetEnvironmentVariable("VALKEY_CONNECTION") ?? "localhost:6379"; + +// Simulate an agent response as a sequence of chunks +string[] simulatedChunks = +[ + "Valkey Streams provide a ", + "powerful append-only log ", + "data structure. ", + "They support consumer groups ", + "for distributed processing, ", + "and each entry gets a unique ", + "auto-generated ID that serves ", + "as a natural continuation token. ", + "This makes them ideal for ", + "resumable streaming scenarios ", + "in AI agent frameworks." +]; + +await using var connection = await ConnectionMultiplexer.ConnectAsync(valkeyConnection); + +var streamBuffer = new ValkeyStreamBuffer( + connection, + new ValkeyStreamBufferOptions + { + KeyPrefix = "sample_stream", + MaxLength = 1000, + }); + +var responseId = $"response-{Guid.NewGuid():N}"; + +// ============================================================ +// Part 1: Stream chunks, simulating a disconnect after 4 +// ============================================================ +Console.WriteLine("=== Part 1: Streaming with simulated disconnect ===\n"); + +string? lastEntryId = null; +int disconnectAfter = 4; + +for (int i = 0; i < simulatedChunks.Length; i++) +{ + var update = new AgentResponseUpdate(ChatRole.Assistant, simulatedChunks[i]); + lastEntryId = await streamBuffer.AppendAsync(responseId, update); + + Console.Write(simulatedChunks[i]); + + if (i + 1 >= disconnectAfter) + { + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine($"\n\n ⚡ CLIENT DISCONNECTED after {disconnectAfter} chunks!"); + Console.ResetColor(); + break; + } +} + +// Meanwhile, the "server" keeps writing the remaining chunks +for (int i = disconnectAfter; i < simulatedChunks.Length; i++) +{ + var update = new AgentResponseUpdate(ChatRole.Assistant, simulatedChunks[i]); + await streamBuffer.AppendAsync(responseId, update); +} + +var totalStored = await streamBuffer.GetEntryCountAsync(responseId); +Console.WriteLine($" 📦 {totalStored} total chunks persisted in Valkey Stream."); +Console.WriteLine($" 🔖 Last seen entry ID: {lastEntryId}\n"); + +// ============================================================ +// Part 2: Resume from the last-seen entry ID +// ============================================================ +Console.WriteLine("=== Part 2: Resuming from last-seen entry ===\n"); + +Console.ForegroundColor = ConsoleColor.Green; +Console.WriteLine(" 🔄 Replaying missed chunks from Valkey...\n"); +Console.ResetColor(); + +int resumedCount = 0; +await foreach (var (entryId, update) in streamBuffer.ReadAsync(responseId, afterEntryId: lastEntryId!)) +{ + Console.Write(update.Text); + resumedCount++; +} + +Console.WriteLine($"\n\n ✅ Resumed {resumedCount} missed chunks."); + +// ============================================================ +// Part 3: Full replay from the beginning +// ============================================================ +Console.WriteLine("\n=== Part 3: Full replay from beginning ===\n"); + +int fullCount = 0; +await foreach (var (entryId, update) in streamBuffer.ReadAsync(responseId)) +{ + Console.Write(update.Text); + fullCount++; +} + +Console.WriteLine($"\n\n 📊 Full replay: {fullCount} total chunks."); + +// Cleanup +await streamBuffer.DeleteStreamAsync(responseId); +Console.WriteLine(" 🗑️ Stream deleted.\n"); +Console.WriteLine("Done!"); diff --git a/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/README.md b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/README.md new file mode 100644 index 00000000000..071cabb49c1 --- /dev/null +++ b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/README.md @@ -0,0 +1,64 @@ +# Resumable Streaming with Valkey + +This sample demonstrates using `ValkeyStreamBuffer` for resumable agent streaming backed by Valkey Streams. + +## What it shows + +1. **Stream + persist** — Streams an agent response while writing each chunk to a Valkey Stream via `XADD` +2. **Simulated disconnect** — Breaks the stream after 4 chunks, simulating a client disconnection +3. **Resume from last-seen** — Replays only the missed chunks using `ReadAsync(afterEntryId)`, demonstrating zero data loss +4. **Full replay** — Replays the entire stream from the beginning + +## How it works + +Each `AgentResponseUpdate` is serialized to JSON and appended to a Valkey Stream keyed by response ID. The `XADD` return value (the stream entry ID) serves as a continuation token. On reconnection, `ReadAsync` uses `XRANGE` starting after the last-seen entry ID to fetch only the missed chunks. + +## Prerequisites + +- Any Valkey or Redis OSS server (no modules required): + +```bash +docker run -d --name valkey -p 6379:6379 valkey/valkey:latest +``` + +No LLM or cloud credentials needed — the sample simulates agent response chunks directly. + +## Environment Variables + +| Variable | Description | Default | +|---|---|---| +| `VALKEY_CONNECTION` | Valkey connection string | `localhost:6379` | + +## Running + +```bash +dotnet run +``` + +## Expected Output + +``` +=== Part 1: Streaming with simulated disconnect === + +Valkey Streams provide a powerful append-only log data structure. + ⚡ CLIENT DISCONNECTED after 4 chunks! + 📦 11 total chunks persisted in Valkey Stream. + 🔖 Last seen entry ID: 1234567890-3 + +=== Part 2: Resuming from last-seen entry === + + 🔄 Replaying missed chunks from Valkey... + +They support consumer groups for distributed processing, and each entry gets a unique auto-generated ID that serves as a natural continuation token. This makes them ideal for resumable streaming scenarios in AI agent frameworks. + ✅ Resumed 7 missed chunks. + +=== Part 3: Full replay from beginning === + +Valkey Streams provide a powerful append-only log data structure. They support consumer groups ... + 📊 Full replay: 11 total chunks. + 🗑️ Stream deleted. + +Done! +``` + +The key takeaway: the server continued writing all 11 chunks to the stream even after the client disconnected at chunk 4. On resume, only the 7 missed chunks were returned — zero data loss. diff --git a/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBuffer.cs b/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBuffer.cs new file mode 100644 index 00000000000..3dc008e99a4 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBuffer.cs @@ -0,0 +1,210 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Text.Json; +using System.Text.Json.Serialization.Metadata; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Shared.Diagnostics; +using Valkey.Glide; + +namespace Microsoft.Agents.AI.Valkey; + +/// +/// Provides a resumable streaming buffer backed by Valkey Streams. +/// +/// +/// +/// Writes chunks to a Valkey Stream via XADD and supports +/// replaying missed entries via XRANGE from a given entry ID, enabling clients to reconnect +/// mid-stream without data loss. +/// +/// +/// Each stream is keyed by a caller-provided response ID (typically session + response scoped). +/// The Valkey Stream entry ID serves as the continuation token for resumption. +/// +/// +/// Server requirements: Any Valkey (or Redis OSS) server. Uses core Stream +/// commands only (XADD, XRANGE, XLEN, XTRIM, DEL) — no modules required. +/// +/// +/// Data retention: Streams are bounded by via XTRIM +/// approximate trimming. Callers can also explicitly delete streams via . +/// +/// +/// Security considerations: +/// +/// PII and sensitive data: Streamed response chunks may contain +/// PII. Ensure the Valkey server is configured with appropriate access controls and TLS. +/// +/// +/// +public sealed class ValkeyStreamBuffer +{ + private const string ContentField = "content"; + + private readonly IConnectionMultiplexer _connection; + private readonly string _keyPrefix; + private readonly int? _maxLength; + private readonly JsonSerializerOptions _jsonSerializerOptions; + private readonly ILogger? _logger; + + /// + /// Initializes a new instance of the class. + /// + /// An existing instance. + /// Optional configuration options. + /// Optional logger factory. + public ValkeyStreamBuffer( + IConnectionMultiplexer connection, + ValkeyStreamBufferOptions? options = null, + ILoggerFactory? loggerFactory = null) + { + this._connection = Throw.IfNull(connection); + this._keyPrefix = options?.KeyPrefix ?? "agent_stream"; + this._maxLength = options?.MaxLength; + this._jsonSerializerOptions = options?.JsonSerializerOptions ?? AgentAbstractionsJsonUtilities.DefaultOptions; + this._logger = loggerFactory?.CreateLogger(); + } + + /// + /// Appends an to the stream for the given response ID. + /// + /// The unique identifier for this streaming response. + /// The response update to append. + /// Cancellation token. + /// The Valkey Stream entry ID, usable as a continuation token for resumption. + /// or is null. + public async Task AppendAsync(string responseId, AgentResponseUpdate update, CancellationToken cancellationToken = default) + { + Throw.IfNullOrWhitespace(responseId); + Throw.IfNull(update); + cancellationToken.ThrowIfCancellationRequested(); + + var db = this._connection.GetDatabase(); + var key = this.BuildKey(responseId); + var serialized = JsonSerializer.Serialize(update, (JsonTypeInfo)this._jsonSerializerOptions.GetTypeInfo(typeof(AgentResponseUpdate))); + + var entries = new NameValueEntry[] { new(ContentField, serialized) }; + var entryId = await db.StreamAddAsync( + key, + entries, + maxLength: this._maxLength, + useApproximateMaxLength: this._maxLength.HasValue).ConfigureAwait(false); + + this._logger?.LogDebug( + "ValkeyStreamBuffer: Appended entry {EntryId} to stream for response.", + entryId); + + return entryId.ToString(); + } + + /// + /// Reads all entries from the stream for the given response ID, starting after the specified entry ID. + /// + /// The unique identifier for the streaming response. + /// + /// The entry ID to start reading after (exclusive). Use "0-0" to read from the beginning, + /// or a previously received entry ID to resume from that point. + /// + /// Cancellation token. + /// An async enumerable of tuples containing the entry ID and the deserialized . + public async IAsyncEnumerable<(string EntryId, AgentResponseUpdate Update)> ReadAsync( + string responseId, + string afterEntryId = "0-0", + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + Throw.IfNullOrWhitespace(responseId); + cancellationToken.ThrowIfCancellationRequested(); + + var db = this._connection.GetDatabase(); + var key = this.BuildKey(responseId); + + var entries = await db.StreamRangeAsync( + key, + minId: afterEntryId == "0-0" ? "-" : $"({afterEntryId}", + maxId: "+").ConfigureAwait(false); + + if (entries is null || entries.Length == 0) + { + yield break; + } + + foreach (var entry in entries) + { + cancellationToken.ThrowIfCancellationRequested(); + + var contentValue = GetContentValue(entry); + if (contentValue is null) + { + continue; + } + + AgentResponseUpdate? update; + try + { + update = JsonSerializer.Deserialize(contentValue, (JsonTypeInfo)this._jsonSerializerOptions.GetTypeInfo(typeof(AgentResponseUpdate))); + } + catch (JsonException ex) + { + this._logger?.LogWarning(ex, "ValkeyStreamBuffer: Skipping malformed entry {EntryId}.", entry.Id); + continue; + } + + if (update is not null) + { + yield return (entry.Id.ToString(), update); + } + } + } + + /// + /// Gets the number of entries in the stream for the given response ID. + /// + /// The unique identifier for the streaming response. + /// Cancellation token. + /// The number of entries in the stream, or 0 if the stream does not exist. + public async Task GetEntryCountAsync(string responseId, CancellationToken cancellationToken = default) + { + Throw.IfNullOrWhitespace(responseId); + cancellationToken.ThrowIfCancellationRequested(); + + var db = this._connection.GetDatabase(); + var key = this.BuildKey(responseId); + return await db.StreamLengthAsync(key).ConfigureAwait(false); + } + + /// + /// Deletes the stream for the given response ID. + /// + /// The unique identifier for the streaming response. + /// Cancellation token. + /// True if the stream was deleted, false if it did not exist. + public async Task DeleteStreamAsync(string responseId, CancellationToken cancellationToken = default) + { + Throw.IfNullOrWhitespace(responseId); + cancellationToken.ThrowIfCancellationRequested(); + + var db = this._connection.GetDatabase(); + var key = this.BuildKey(responseId); + return await db.KeyDeleteAsync(key).ConfigureAwait(false); + } + + private string BuildKey(string responseId) => $"{this._keyPrefix}:{responseId}"; + + private static string? GetContentValue(StreamEntry entry) + { + foreach (var nv in entry.Values) + { + if (nv.Name == ContentField && !nv.Value.IsNullOrEmpty) + { + return nv.Value.ToString(); + } + } + + return null; + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBufferOptions.cs b/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBufferOptions.cs new file mode 100644 index 00000000000..2f5e3032412 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBufferOptions.cs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; + +namespace Microsoft.Agents.AI.Valkey; + +/// +/// Options for configuring . +/// +public sealed class ValkeyStreamBufferOptions +{ + /// + /// Gets or sets the prefix for Valkey stream keys. Defaults to "agent_stream". + /// + public string KeyPrefix { get; set; } = "agent_stream"; + + /// + /// Gets or sets the maximum number of entries to retain per stream. + /// When set, XTRIM with approximate trimming is applied after each XADD. + /// Null means no trimming. + /// + public int? MaxLength { get; set; } + + /// + /// Gets or sets optional JSON serializer options for serializing AgentResponseUpdate. + /// + public JsonSerializerOptions? JsonSerializerOptions { get; set; } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Valkey.UnitTests/ValkeyStreamBufferTests.cs b/dotnet/tests/Microsoft.Agents.AI.Valkey.UnitTests/ValkeyStreamBufferTests.cs new file mode 100644 index 00000000000..019f27554f8 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Valkey.UnitTests/ValkeyStreamBufferTests.cs @@ -0,0 +1,308 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Moq; +using Valkey.Glide; + +namespace Microsoft.Agents.AI.Valkey.UnitTests; + +/// +/// Unit tests for . +/// +public sealed class ValkeyStreamBufferTests +{ + private static Mock CreateMockConnection(Mock? dbMock = null) + { + var mockConnection = new Mock(); + dbMock ??= new Mock(); + mockConnection.Setup(c => c.GetDatabase()).Returns(dbMock.Object); + return mockConnection; + } + + // --- Constructor tests --- + + [Fact] + public void Constructor_WithConnection_SetsProperties() + { + // Arrange & Act + var buffer = new ValkeyStreamBuffer( + CreateMockConnection().Object, + new ValkeyStreamBufferOptions { KeyPrefix = "test_stream" }); + + // Assert + Assert.NotNull(buffer); + } + + [Fact] + public void Constructor_NullConnection_Throws() + { + Assert.Throws(() => + new ValkeyStreamBuffer(null!)); + } + + // --- AppendAsync tests --- + + [Fact] + public async Task AppendAsync_ReturnsEntryIdAsync() + { + // Arrange + var dbMock = new Mock(); + dbMock.Setup(d => d.StreamAddAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult("1234567890-0")); + + var buffer = new ValkeyStreamBuffer( + CreateMockConnection(dbMock).Object, + new ValkeyStreamBufferOptions { KeyPrefix = "test" }); + + var update = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "hello"); + + // Act + var entryId = await buffer.AppendAsync("resp-1", update); + + // Assert + Assert.Equal("1234567890-0", entryId); + } + + [Fact] + public async Task AppendAsync_NullResponseId_ThrowsAsync() + { + var buffer = new ValkeyStreamBuffer(CreateMockConnection().Object); + var update = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "hello"); + + await Assert.ThrowsAsync(() => + buffer.AppendAsync(null!, update)); + } + + [Fact] + public async Task AppendAsync_NullUpdate_ThrowsAsync() + { + var buffer = new ValkeyStreamBuffer(CreateMockConnection().Object); + + await Assert.ThrowsAsync(() => + buffer.AppendAsync("resp-1", null!)); + } + + [Fact] + public async Task AppendAsync_CancellationToken_ThrowsAsync() + { + var buffer = new ValkeyStreamBuffer(CreateMockConnection().Object); + var cts = new CancellationTokenSource(); + cts.Cancel(); + + var update = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "hello"); + + await Assert.ThrowsAsync(() => + buffer.AppendAsync("resp-1", update, cts.Token)); + } + + // --- ReadAsync tests --- + + [Fact] + public async Task ReadAsync_ReturnsDeserializedEntriesAsync() + { + // Arrange + var dbMock = new Mock(); + var update1 = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "chunk1"); + var update2 = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "chunk2"); + + var entries = new StreamEntry[] + { + new("1-0", [new NameValueEntry("content", System.Text.Json.JsonSerializer.Serialize(update1))]), + new("2-0", [new NameValueEntry("content", System.Text.Json.JsonSerializer.Serialize(update2))]), + }; + + dbMock.Setup(d => d.StreamRangeAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(entries); + + var buffer = new ValkeyStreamBuffer( + CreateMockConnection(dbMock).Object, + new ValkeyStreamBufferOptions { KeyPrefix = "test" }); + + // Act + var results = await buffer.ReadAsync("resp-1").ToListAsync(); + + // Assert + Assert.Equal(2, results.Count); + Assert.Equal("1-0", results[0].EntryId); + Assert.Equal("chunk1", results[0].Update.Text); + Assert.Equal("2-0", results[1].EntryId); + Assert.Equal("chunk2", results[1].Update.Text); + } + + [Fact] + public async Task ReadAsync_WithAfterEntryId_UsesExclusivePrefixAsync() + { + // Arrange + var dbMock = new Mock(); + dbMock.Setup(d => d.StreamRangeAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync([]); + + var buffer = new ValkeyStreamBuffer( + CreateMockConnection(dbMock).Object); + + // Act — read after entry "5-3" should use "(5-3" as exclusive minId + await buffer.ReadAsync("resp-1", afterEntryId: "5-3").ToListAsync(); + + // Assert — Valkey's ( prefix means exclusive range start + dbMock.Verify(d => d.StreamRangeAsync( + It.IsAny(), + (ValkeyValue)"(5-3", + (ValkeyValue)"+", + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + } + + [Fact] + public async Task ReadAsync_EmptyStream_ReturnsEmptyAsync() + { + // Arrange + var dbMock = new Mock(); + dbMock.Setup(d => d.StreamRangeAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync([]); + + var buffer = new ValkeyStreamBuffer(CreateMockConnection(dbMock).Object); + + // Act + var results = await buffer.ReadAsync("resp-1").ToListAsync(); + + // Assert + Assert.Empty(results); + } + + [Fact] + public async Task ReadAsync_MalformedEntry_SkipsItAsync() + { + // Arrange + var dbMock = new Mock(); + var entries = new StreamEntry[] + { + new("1-0", [new NameValueEntry("content", "not valid json")]), + new("2-0", [new NameValueEntry("content", System.Text.Json.JsonSerializer.Serialize( + new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "valid")))]), + }; + + dbMock.Setup(d => d.StreamRangeAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(entries); + + var buffer = new ValkeyStreamBuffer(CreateMockConnection(dbMock).Object); + + // Act + var results = await buffer.ReadAsync("resp-1").ToListAsync(); + + // Assert — only the valid entry + Assert.Single(results); + Assert.Equal("valid", results[0].Update.Text); + } + + // --- AppendAsync with MaxLength tests --- + + [Fact] + public async Task AppendAsync_WithMaxLength_PassesThroughToStreamAddAsync() + { + // Arrange + var dbMock = new Mock(); + dbMock.Setup(d => d.StreamAddAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + 100, + true, + It.IsAny())) + .Returns(Task.FromResult("1-0")); + + var buffer = new ValkeyStreamBuffer( + CreateMockConnection(dbMock).Object, + new ValkeyStreamBufferOptions { MaxLength = 100 }); + + var update = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "hello"); + + // Act + await buffer.AppendAsync("resp-1", update); + + // Assert — verify maxLength=100 and useApproximateMaxLength=true were passed + dbMock.Verify(d => d.StreamAddAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + 100, + true, + It.IsAny()), Times.Once); + } + + // --- GetEntryCountAsync tests --- + + [Fact] + public async Task GetEntryCountAsync_ReturnsStreamLengthAsync() + { + // Arrange + var dbMock = new Mock(); + dbMock.Setup(d => d.StreamLengthAsync(It.IsAny())) + .ReturnsAsync(42); + + var buffer = new ValkeyStreamBuffer( + CreateMockConnection(dbMock).Object, + new ValkeyStreamBufferOptions { KeyPrefix = "test" }); + + // Act + var count = await buffer.GetEntryCountAsync("resp-1"); + + // Assert + Assert.Equal(42, count); + } + + // --- DeleteStreamAsync tests --- + + [Fact] + public async Task DeleteStreamAsync_DeletesKeyAsync() + { + // Arrange + var dbMock = new Mock(); + dbMock.Setup(d => d.KeyDeleteAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(true); + + var buffer = new ValkeyStreamBuffer( + CreateMockConnection(dbMock).Object, + new ValkeyStreamBufferOptions { KeyPrefix = "test" }); + + // Act + var deleted = await buffer.DeleteStreamAsync("resp-1"); + + // Assert + Assert.True(deleted); + dbMock.Verify(d => d.KeyDeleteAsync((ValkeyKey)"test:resp-1", It.IsAny()), Times.Once); + } +}