diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx
index f5bcc57b733..1f2f92132ca 100644
--- a/dotnet/agent-framework-dotnet.slnx
+++ b/dotnet/agent-framework-dotnet.slnx
@@ -196,6 +196,7 @@
+
diff --git a/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/AgentWithMemory_Step06_ResumableStreamingWithValkey.csproj b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/AgentWithMemory_Step06_ResumableStreamingWithValkey.csproj
new file mode 100644
index 00000000000..efb1985d547
--- /dev/null
+++ b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/AgentWithMemory_Step06_ResumableStreamingWithValkey.csproj
@@ -0,0 +1,15 @@
+
+
+
+ Exe
+ net10.0
+
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/Program.cs b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/Program.cs
new file mode 100644
index 00000000000..718f34dd75f
--- /dev/null
+++ b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/Program.cs
@@ -0,0 +1,116 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+// This sample demonstrates resumable streaming using ValkeyStreamBuffer.
+// No LLM is required — it simulates agent response chunks to show the
+// append, disconnect, and resume workflow backed by Valkey Streams.
+//
+// Prerequisites:
+// - Any Valkey or Redis OSS server (no modules required):
+// docker run -d --name valkey -p 6379:6379 valkey/valkey:latest
+
+using Microsoft.Agents.AI;
+using Microsoft.Agents.AI.Valkey;
+using Microsoft.Extensions.AI;
+using Valkey.Glide;
+
+var valkeyConnection = Environment.GetEnvironmentVariable("VALKEY_CONNECTION") ?? "localhost:6379";
+
+// Simulate an agent response as a sequence of chunks
+string[] simulatedChunks =
+[
+ "Valkey Streams provide a ",
+ "powerful append-only log ",
+ "data structure. ",
+ "They support consumer groups ",
+ "for distributed processing, ",
+ "and each entry gets a unique ",
+ "auto-generated ID that serves ",
+ "as a natural continuation token. ",
+ "This makes them ideal for ",
+ "resumable streaming scenarios ",
+ "in AI agent frameworks."
+];
+
+await using var connection = await ConnectionMultiplexer.ConnectAsync(valkeyConnection);
+
+var streamBuffer = new ValkeyStreamBuffer(
+ connection,
+ new ValkeyStreamBufferOptions
+ {
+ KeyPrefix = "sample_stream",
+ MaxLength = 1000,
+ });
+
+var responseId = $"response-{Guid.NewGuid():N}";
+
+// ============================================================
+// Part 1: Stream chunks, simulating a disconnect after 4
+// ============================================================
+Console.WriteLine("=== Part 1: Streaming with simulated disconnect ===\n");
+
+string? lastEntryId = null;
+int disconnectAfter = 4;
+
+for (int i = 0; i < simulatedChunks.Length; i++)
+{
+ var update = new AgentResponseUpdate(ChatRole.Assistant, simulatedChunks[i]);
+ lastEntryId = await streamBuffer.AppendAsync(responseId, update);
+
+ Console.Write(simulatedChunks[i]);
+
+ if (i + 1 >= disconnectAfter)
+ {
+ Console.ForegroundColor = ConsoleColor.Red;
+ Console.WriteLine($"\n\n ⚡ CLIENT DISCONNECTED after {disconnectAfter} chunks!");
+ Console.ResetColor();
+ break;
+ }
+}
+
+// Meanwhile, the "server" keeps writing the remaining chunks
+for (int i = disconnectAfter; i < simulatedChunks.Length; i++)
+{
+ var update = new AgentResponseUpdate(ChatRole.Assistant, simulatedChunks[i]);
+ await streamBuffer.AppendAsync(responseId, update);
+}
+
+var totalStored = await streamBuffer.GetEntryCountAsync(responseId);
+Console.WriteLine($" 📦 {totalStored} total chunks persisted in Valkey Stream.");
+Console.WriteLine($" 🔖 Last seen entry ID: {lastEntryId}\n");
+
+// ============================================================
+// Part 2: Resume from the last-seen entry ID
+// ============================================================
+Console.WriteLine("=== Part 2: Resuming from last-seen entry ===\n");
+
+Console.ForegroundColor = ConsoleColor.Green;
+Console.WriteLine(" 🔄 Replaying missed chunks from Valkey...\n");
+Console.ResetColor();
+
+int resumedCount = 0;
+await foreach (var (entryId, update) in streamBuffer.ReadAsync(responseId, afterEntryId: lastEntryId!))
+{
+ Console.Write(update.Text);
+ resumedCount++;
+}
+
+Console.WriteLine($"\n\n ✅ Resumed {resumedCount} missed chunks.");
+
+// ============================================================
+// Part 3: Full replay from the beginning
+// ============================================================
+Console.WriteLine("\n=== Part 3: Full replay from beginning ===\n");
+
+int fullCount = 0;
+await foreach (var (entryId, update) in streamBuffer.ReadAsync(responseId))
+{
+ Console.Write(update.Text);
+ fullCount++;
+}
+
+Console.WriteLine($"\n\n 📊 Full replay: {fullCount} total chunks.");
+
+// Cleanup
+await streamBuffer.DeleteStreamAsync(responseId);
+Console.WriteLine(" 🗑️ Stream deleted.\n");
+Console.WriteLine("Done!");
diff --git a/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/README.md b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/README.md
new file mode 100644
index 00000000000..071cabb49c1
--- /dev/null
+++ b/dotnet/samples/02-agents/AgentWithMemory/AgentWithMemory_Step06_ResumableStreamingWithValkey/README.md
@@ -0,0 +1,64 @@
+# Resumable Streaming with Valkey
+
+This sample demonstrates using `ValkeyStreamBuffer` for resumable agent streaming backed by Valkey Streams.
+
+## What it shows
+
+1. **Stream + persist** — Streams an agent response while writing each chunk to a Valkey Stream via `XADD`
+2. **Simulated disconnect** — Breaks the stream after 4 chunks, simulating a client disconnection
+3. **Resume from last-seen** — Replays only the missed chunks using `ReadAsync(afterEntryId)`, demonstrating zero data loss
+4. **Full replay** — Replays the entire stream from the beginning
+
+## How it works
+
+Each `AgentResponseUpdate` is serialized to JSON and appended to a Valkey Stream keyed by response ID. The `XADD` return value (the stream entry ID) serves as a continuation token. On reconnection, `ReadAsync` uses `XRANGE` starting after the last-seen entry ID to fetch only the missed chunks.
+
+## Prerequisites
+
+- Any Valkey or Redis OSS server (no modules required):
+
+```bash
+docker run -d --name valkey -p 6379:6379 valkey/valkey:latest
+```
+
+No LLM or cloud credentials needed — the sample simulates agent response chunks directly.
+
+## Environment Variables
+
+| Variable | Description | Default |
+|---|---|---|
+| `VALKEY_CONNECTION` | Valkey connection string | `localhost:6379` |
+
+## Running
+
+```bash
+dotnet run
+```
+
+## Expected Output
+
+```
+=== Part 1: Streaming with simulated disconnect ===
+
+Valkey Streams provide a powerful append-only log data structure.
+ ⚡ CLIENT DISCONNECTED after 4 chunks!
+ 📦 11 total chunks persisted in Valkey Stream.
+ 🔖 Last seen entry ID: 1234567890-3
+
+=== Part 2: Resuming from last-seen entry ===
+
+ 🔄 Replaying missed chunks from Valkey...
+
+They support consumer groups for distributed processing, and each entry gets a unique auto-generated ID that serves as a natural continuation token. This makes them ideal for resumable streaming scenarios in AI agent frameworks.
+ ✅ Resumed 7 missed chunks.
+
+=== Part 3: Full replay from beginning ===
+
+Valkey Streams provide a powerful append-only log data structure. They support consumer groups ...
+ 📊 Full replay: 11 total chunks.
+ 🗑️ Stream deleted.
+
+Done!
+```
+
+The key takeaway: the server continued writing all 11 chunks to the stream even after the client disconnected at chunk 4. On resume, only the 7 missed chunks were returned — zero data loss.
diff --git a/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBuffer.cs b/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBuffer.cs
new file mode 100644
index 00000000000..3dc008e99a4
--- /dev/null
+++ b/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBuffer.cs
@@ -0,0 +1,210 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Text.Json;
+using System.Text.Json.Serialization.Metadata;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Microsoft.Shared.Diagnostics;
+using Valkey.Glide;
+
+namespace Microsoft.Agents.AI.Valkey;
+
+///
+/// Provides a resumable streaming buffer backed by Valkey Streams.
+///
+///
+///
+/// Writes chunks to a Valkey Stream via XADD and supports
+/// replaying missed entries via XRANGE from a given entry ID, enabling clients to reconnect
+/// mid-stream without data loss.
+///
+///
+/// Each stream is keyed by a caller-provided response ID (typically session + response scoped).
+/// The Valkey Stream entry ID serves as the continuation token for resumption.
+///
+///
+/// Server requirements: Any Valkey (or Redis OSS) server. Uses core Stream
+/// commands only (XADD, XRANGE, XLEN, XTRIM, DEL) — no modules required.
+///
+///
+/// Data retention: Streams are bounded by via XTRIM
+/// approximate trimming. Callers can also explicitly delete streams via .
+///
+///
+/// Security considerations:
+///
+/// - PII and sensitive data: Streamed response chunks may contain
+/// PII. Ensure the Valkey server is configured with appropriate access controls and TLS.
+///
+///
+///
+public sealed class ValkeyStreamBuffer
+{
+ private const string ContentField = "content";
+
+ private readonly IConnectionMultiplexer _connection;
+ private readonly string _keyPrefix;
+ private readonly int? _maxLength;
+ private readonly JsonSerializerOptions _jsonSerializerOptions;
+ private readonly ILogger? _logger;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// An existing instance.
+ /// Optional configuration options.
+ /// Optional logger factory.
+ public ValkeyStreamBuffer(
+ IConnectionMultiplexer connection,
+ ValkeyStreamBufferOptions? options = null,
+ ILoggerFactory? loggerFactory = null)
+ {
+ this._connection = Throw.IfNull(connection);
+ this._keyPrefix = options?.KeyPrefix ?? "agent_stream";
+ this._maxLength = options?.MaxLength;
+ this._jsonSerializerOptions = options?.JsonSerializerOptions ?? AgentAbstractionsJsonUtilities.DefaultOptions;
+ this._logger = loggerFactory?.CreateLogger();
+ }
+
+ ///
+ /// Appends an to the stream for the given response ID.
+ ///
+ /// The unique identifier for this streaming response.
+ /// The response update to append.
+ /// Cancellation token.
+ /// The Valkey Stream entry ID, usable as a continuation token for resumption.
+ /// or is null.
+ public async Task AppendAsync(string responseId, AgentResponseUpdate update, CancellationToken cancellationToken = default)
+ {
+ Throw.IfNullOrWhitespace(responseId);
+ Throw.IfNull(update);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var db = this._connection.GetDatabase();
+ var key = this.BuildKey(responseId);
+ var serialized = JsonSerializer.Serialize(update, (JsonTypeInfo)this._jsonSerializerOptions.GetTypeInfo(typeof(AgentResponseUpdate)));
+
+ var entries = new NameValueEntry[] { new(ContentField, serialized) };
+ var entryId = await db.StreamAddAsync(
+ key,
+ entries,
+ maxLength: this._maxLength,
+ useApproximateMaxLength: this._maxLength.HasValue).ConfigureAwait(false);
+
+ this._logger?.LogDebug(
+ "ValkeyStreamBuffer: Appended entry {EntryId} to stream for response.",
+ entryId);
+
+ return entryId.ToString();
+ }
+
+ ///
+ /// Reads all entries from the stream for the given response ID, starting after the specified entry ID.
+ ///
+ /// The unique identifier for the streaming response.
+ ///
+ /// The entry ID to start reading after (exclusive). Use "0-0" to read from the beginning,
+ /// or a previously received entry ID to resume from that point.
+ ///
+ /// Cancellation token.
+ /// An async enumerable of tuples containing the entry ID and the deserialized .
+ public async IAsyncEnumerable<(string EntryId, AgentResponseUpdate Update)> ReadAsync(
+ string responseId,
+ string afterEntryId = "0-0",
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ Throw.IfNullOrWhitespace(responseId);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var db = this._connection.GetDatabase();
+ var key = this.BuildKey(responseId);
+
+ var entries = await db.StreamRangeAsync(
+ key,
+ minId: afterEntryId == "0-0" ? "-" : $"({afterEntryId}",
+ maxId: "+").ConfigureAwait(false);
+
+ if (entries is null || entries.Length == 0)
+ {
+ yield break;
+ }
+
+ foreach (var entry in entries)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var contentValue = GetContentValue(entry);
+ if (contentValue is null)
+ {
+ continue;
+ }
+
+ AgentResponseUpdate? update;
+ try
+ {
+ update = JsonSerializer.Deserialize(contentValue, (JsonTypeInfo)this._jsonSerializerOptions.GetTypeInfo(typeof(AgentResponseUpdate)));
+ }
+ catch (JsonException ex)
+ {
+ this._logger?.LogWarning(ex, "ValkeyStreamBuffer: Skipping malformed entry {EntryId}.", entry.Id);
+ continue;
+ }
+
+ if (update is not null)
+ {
+ yield return (entry.Id.ToString(), update);
+ }
+ }
+ }
+
+ ///
+ /// Gets the number of entries in the stream for the given response ID.
+ ///
+ /// The unique identifier for the streaming response.
+ /// Cancellation token.
+ /// The number of entries in the stream, or 0 if the stream does not exist.
+ public async Task GetEntryCountAsync(string responseId, CancellationToken cancellationToken = default)
+ {
+ Throw.IfNullOrWhitespace(responseId);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var db = this._connection.GetDatabase();
+ var key = this.BuildKey(responseId);
+ return await db.StreamLengthAsync(key).ConfigureAwait(false);
+ }
+
+ ///
+ /// Deletes the stream for the given response ID.
+ ///
+ /// The unique identifier for the streaming response.
+ /// Cancellation token.
+ /// True if the stream was deleted, false if it did not exist.
+ public async Task DeleteStreamAsync(string responseId, CancellationToken cancellationToken = default)
+ {
+ Throw.IfNullOrWhitespace(responseId);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var db = this._connection.GetDatabase();
+ var key = this.BuildKey(responseId);
+ return await db.KeyDeleteAsync(key).ConfigureAwait(false);
+ }
+
+ private string BuildKey(string responseId) => $"{this._keyPrefix}:{responseId}";
+
+ private static string? GetContentValue(StreamEntry entry)
+ {
+ foreach (var nv in entry.Values)
+ {
+ if (nv.Name == ContentField && !nv.Value.IsNullOrEmpty)
+ {
+ return nv.Value.ToString();
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBufferOptions.cs b/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBufferOptions.cs
new file mode 100644
index 00000000000..2f5e3032412
--- /dev/null
+++ b/dotnet/src/Microsoft.Agents.AI.Valkey/ValkeyStreamBufferOptions.cs
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System.Text.Json;
+
+namespace Microsoft.Agents.AI.Valkey;
+
+///
+/// Options for configuring .
+///
+public sealed class ValkeyStreamBufferOptions
+{
+ ///
+ /// Gets or sets the prefix for Valkey stream keys. Defaults to "agent_stream".
+ ///
+ public string KeyPrefix { get; set; } = "agent_stream";
+
+ ///
+ /// Gets or sets the maximum number of entries to retain per stream.
+ /// When set, XTRIM with approximate trimming is applied after each XADD.
+ /// Null means no trimming.
+ ///
+ public int? MaxLength { get; set; }
+
+ ///
+ /// Gets or sets optional JSON serializer options for serializing AgentResponseUpdate.
+ ///
+ public JsonSerializerOptions? JsonSerializerOptions { get; set; }
+}
diff --git a/dotnet/tests/Microsoft.Agents.AI.Valkey.UnitTests/ValkeyStreamBufferTests.cs b/dotnet/tests/Microsoft.Agents.AI.Valkey.UnitTests/ValkeyStreamBufferTests.cs
new file mode 100644
index 00000000000..019f27554f8
--- /dev/null
+++ b/dotnet/tests/Microsoft.Agents.AI.Valkey.UnitTests/ValkeyStreamBufferTests.cs
@@ -0,0 +1,308 @@
+// Copyright (c) Microsoft. All rights reserved.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Moq;
+using Valkey.Glide;
+
+namespace Microsoft.Agents.AI.Valkey.UnitTests;
+
+///
+/// Unit tests for .
+///
+public sealed class ValkeyStreamBufferTests
+{
+ private static Mock CreateMockConnection(Mock? dbMock = null)
+ {
+ var mockConnection = new Mock();
+ dbMock ??= new Mock();
+ mockConnection.Setup(c => c.GetDatabase()).Returns(dbMock.Object);
+ return mockConnection;
+ }
+
+ // --- Constructor tests ---
+
+ [Fact]
+ public void Constructor_WithConnection_SetsProperties()
+ {
+ // Arrange & Act
+ var buffer = new ValkeyStreamBuffer(
+ CreateMockConnection().Object,
+ new ValkeyStreamBufferOptions { KeyPrefix = "test_stream" });
+
+ // Assert
+ Assert.NotNull(buffer);
+ }
+
+ [Fact]
+ public void Constructor_NullConnection_Throws()
+ {
+ Assert.Throws(() =>
+ new ValkeyStreamBuffer(null!));
+ }
+
+ // --- AppendAsync tests ---
+
+ [Fact]
+ public async Task AppendAsync_ReturnsEntryIdAsync()
+ {
+ // Arrange
+ var dbMock = new Mock();
+ dbMock.Setup(d => d.StreamAddAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .Returns(Task.FromResult("1234567890-0"));
+
+ var buffer = new ValkeyStreamBuffer(
+ CreateMockConnection(dbMock).Object,
+ new ValkeyStreamBufferOptions { KeyPrefix = "test" });
+
+ var update = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "hello");
+
+ // Act
+ var entryId = await buffer.AppendAsync("resp-1", update);
+
+ // Assert
+ Assert.Equal("1234567890-0", entryId);
+ }
+
+ [Fact]
+ public async Task AppendAsync_NullResponseId_ThrowsAsync()
+ {
+ var buffer = new ValkeyStreamBuffer(CreateMockConnection().Object);
+ var update = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "hello");
+
+ await Assert.ThrowsAsync(() =>
+ buffer.AppendAsync(null!, update));
+ }
+
+ [Fact]
+ public async Task AppendAsync_NullUpdate_ThrowsAsync()
+ {
+ var buffer = new ValkeyStreamBuffer(CreateMockConnection().Object);
+
+ await Assert.ThrowsAsync(() =>
+ buffer.AppendAsync("resp-1", null!));
+ }
+
+ [Fact]
+ public async Task AppendAsync_CancellationToken_ThrowsAsync()
+ {
+ var buffer = new ValkeyStreamBuffer(CreateMockConnection().Object);
+ var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ var update = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "hello");
+
+ await Assert.ThrowsAsync(() =>
+ buffer.AppendAsync("resp-1", update, cts.Token));
+ }
+
+ // --- ReadAsync tests ---
+
+ [Fact]
+ public async Task ReadAsync_ReturnsDeserializedEntriesAsync()
+ {
+ // Arrange
+ var dbMock = new Mock();
+ var update1 = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "chunk1");
+ var update2 = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "chunk2");
+
+ var entries = new StreamEntry[]
+ {
+ new("1-0", [new NameValueEntry("content", System.Text.Json.JsonSerializer.Serialize(update1))]),
+ new("2-0", [new NameValueEntry("content", System.Text.Json.JsonSerializer.Serialize(update2))]),
+ };
+
+ dbMock.Setup(d => d.StreamRangeAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .ReturnsAsync(entries);
+
+ var buffer = new ValkeyStreamBuffer(
+ CreateMockConnection(dbMock).Object,
+ new ValkeyStreamBufferOptions { KeyPrefix = "test" });
+
+ // Act
+ var results = await buffer.ReadAsync("resp-1").ToListAsync();
+
+ // Assert
+ Assert.Equal(2, results.Count);
+ Assert.Equal("1-0", results[0].EntryId);
+ Assert.Equal("chunk1", results[0].Update.Text);
+ Assert.Equal("2-0", results[1].EntryId);
+ Assert.Equal("chunk2", results[1].Update.Text);
+ }
+
+ [Fact]
+ public async Task ReadAsync_WithAfterEntryId_UsesExclusivePrefixAsync()
+ {
+ // Arrange
+ var dbMock = new Mock();
+ dbMock.Setup(d => d.StreamRangeAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .ReturnsAsync([]);
+
+ var buffer = new ValkeyStreamBuffer(
+ CreateMockConnection(dbMock).Object);
+
+ // Act — read after entry "5-3" should use "(5-3" as exclusive minId
+ await buffer.ReadAsync("resp-1", afterEntryId: "5-3").ToListAsync();
+
+ // Assert — Valkey's ( prefix means exclusive range start
+ dbMock.Verify(d => d.StreamRangeAsync(
+ It.IsAny(),
+ (ValkeyValue)"(5-3",
+ (ValkeyValue)"+",
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()), Times.Once);
+ }
+
+ [Fact]
+ public async Task ReadAsync_EmptyStream_ReturnsEmptyAsync()
+ {
+ // Arrange
+ var dbMock = new Mock();
+ dbMock.Setup(d => d.StreamRangeAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .ReturnsAsync([]);
+
+ var buffer = new ValkeyStreamBuffer(CreateMockConnection(dbMock).Object);
+
+ // Act
+ var results = await buffer.ReadAsync("resp-1").ToListAsync();
+
+ // Assert
+ Assert.Empty(results);
+ }
+
+ [Fact]
+ public async Task ReadAsync_MalformedEntry_SkipsItAsync()
+ {
+ // Arrange
+ var dbMock = new Mock();
+ var entries = new StreamEntry[]
+ {
+ new("1-0", [new NameValueEntry("content", "not valid json")]),
+ new("2-0", [new NameValueEntry("content", System.Text.Json.JsonSerializer.Serialize(
+ new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "valid")))]),
+ };
+
+ dbMock.Setup(d => d.StreamRangeAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny()))
+ .ReturnsAsync(entries);
+
+ var buffer = new ValkeyStreamBuffer(CreateMockConnection(dbMock).Object);
+
+ // Act
+ var results = await buffer.ReadAsync("resp-1").ToListAsync();
+
+ // Assert — only the valid entry
+ Assert.Single(results);
+ Assert.Equal("valid", results[0].Update.Text);
+ }
+
+ // --- AppendAsync with MaxLength tests ---
+
+ [Fact]
+ public async Task AppendAsync_WithMaxLength_PassesThroughToStreamAddAsync()
+ {
+ // Arrange
+ var dbMock = new Mock();
+ dbMock.Setup(d => d.StreamAddAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ 100,
+ true,
+ It.IsAny()))
+ .Returns(Task.FromResult("1-0"));
+
+ var buffer = new ValkeyStreamBuffer(
+ CreateMockConnection(dbMock).Object,
+ new ValkeyStreamBufferOptions { MaxLength = 100 });
+
+ var update = new AgentResponseUpdate(Extensions.AI.ChatRole.Assistant, "hello");
+
+ // Act
+ await buffer.AppendAsync("resp-1", update);
+
+ // Assert — verify maxLength=100 and useApproximateMaxLength=true were passed
+ dbMock.Verify(d => d.StreamAddAsync(
+ It.IsAny(),
+ It.IsAny(),
+ It.IsAny(),
+ 100,
+ true,
+ It.IsAny()), Times.Once);
+ }
+
+ // --- GetEntryCountAsync tests ---
+
+ [Fact]
+ public async Task GetEntryCountAsync_ReturnsStreamLengthAsync()
+ {
+ // Arrange
+ var dbMock = new Mock();
+ dbMock.Setup(d => d.StreamLengthAsync(It.IsAny()))
+ .ReturnsAsync(42);
+
+ var buffer = new ValkeyStreamBuffer(
+ CreateMockConnection(dbMock).Object,
+ new ValkeyStreamBufferOptions { KeyPrefix = "test" });
+
+ // Act
+ var count = await buffer.GetEntryCountAsync("resp-1");
+
+ // Assert
+ Assert.Equal(42, count);
+ }
+
+ // --- DeleteStreamAsync tests ---
+
+ [Fact]
+ public async Task DeleteStreamAsync_DeletesKeyAsync()
+ {
+ // Arrange
+ var dbMock = new Mock();
+ dbMock.Setup(d => d.KeyDeleteAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(true);
+
+ var buffer = new ValkeyStreamBuffer(
+ CreateMockConnection(dbMock).Object,
+ new ValkeyStreamBufferOptions { KeyPrefix = "test" });
+
+ // Act
+ var deleted = await buffer.DeleteStreamAsync("resp-1");
+
+ // Assert
+ Assert.True(deleted);
+ dbMock.Verify(d => d.KeyDeleteAsync((ValkeyKey)"test:resp-1", It.IsAny()), Times.Once);
+ }
+}