Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public enum CheckpointType

private const int NumEventsToFallBehind = 3 * 32;

// Delay to wait for max age expiration in tests. Set to 2500ms (2.5 seconds) to account for
// timing variations, especially on Windows where expiration checks may not be perfectly synchronized.
// This reduces test flakiness by ensuring events are fully expired before verification.
private const int ExpirationDelayMs = 2500;

public static object[] TestCases = {
// NON-EXISTENT STREAM
CreateTestData(
Expand Down Expand Up @@ -687,8 +692,15 @@ private Task WriteMetadata(string metadata)
private Task MaxCount(long maxCount) => WriteMetadata(@$"{{""$maxCount"":{maxCount}}}");
private async Task ExpiredMaxAge()
{
// Set max age to 1 second - events older than this will be expired/truncated.
// Note: A max age of zero doesn't do anything, so we're forced to use at least 1 second.
await WriteMetadata(@"{""$maxAge"": 1 }"); // seconds
await Task.Delay(TimeSpan.FromMilliseconds(2000));

// Wait for events to expire. Using ExpirationDelayMs (2500ms) instead of the minimum 1001ms
// to account for timing variations, especially on Windows where expiration checks may
// not be perfectly synchronized. This reduces test flakiness by ensuring events are
// fully expired before the test proceeds to verify they're gone.
await Task.Delay(TimeSpan.FromMilliseconds(ExpirationDelayMs));
}

private Task ApplyTruncation()
Expand Down Expand Up @@ -890,6 +902,8 @@ public async Task enumeration_is_correct()
return;
}

// Verify subscription confirmation message. Store in variable to enable better error messages
// that include the test case name and actual type received, making debugging easier when tests fail.
var current = await sub.GetNext();
Assert.True(
current is SubscriptionConfirmation,
Expand All @@ -899,6 +913,8 @@ public async Task enumeration_is_correct()

_nextEventNumber = await ReadExpectedEvents(sub, _nextEventNumber, LastEventNumber);

// Verify caught-up message with descriptive error message for easier debugging.
// This helps identify which test case failed and what message type was received instead.
current = await sub.GetNext();
Assert.True(
current is CaughtUp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ public void throws_for_incorrectly_formatted_filename()
}

[Test]
public void returns_correct_prefix_with_get_prefix_for() {
public void returns_correct_prefix_with_get_prefix_for()
{
var strategy = new VersionedPatternFileNamingStrategy(PathName, "chunk-");
Assert.AreEqual("chunk-", strategy.GetPrefixFor(null, null));
Assert.AreEqual("chunk-000000.", strategy.GetPrefixFor(0, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ public static class LogRecordExtensions
public static void WriteWithLengthPrefixAndSuffixTo(this ILogRecord record, BinaryWriter writer)
{
var localWriter = new BufferWriterSlim<byte>();
try {
try
{
record.WriteTo(ref localWriter);

writer.Write(localWriter.WrittenCount);
writer.Write(localWriter.WrittenSpan);
writer.Write(localWriter.WrittenCount);
} finally {
}
finally
{
localWriter.Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,39 @@ public async Task a_read_on_new_file_can_be_performed()
chunk.MarkForDeletion();
chunk.WaitForDestroy(5000);
}
/*
[Test]
public void a_read_on_scavenged_chunk_includes_map()
{
var chunk = TFChunk.CreateNew(GetFilePathFor("afile"), 200, 0, 0, isScavenged: true, inMem: false, unbuffered: false, writethrough: false);
chunk.CompleteScavenge(new [] {new PosMap(0, 0), new PosMap(1,1) }, false);
using (var reader = chunk.AcquireRawReader())
{
var buffer = new byte[1024];
var result = reader.ReadNextBytes(1024, buffer);
Assert.IsFalse(result.IsEOF);
Assert.AreEqual(ChunkHeader.Size + ChunkHeader.Size + 2 * PosMap.FullSize, result.BytesRead);
}
chunk.MarkForDeletion();
chunk.WaitForDestroy(5000);
}
/*
[Test]
public void a_read_on_scavenged_chunk_includes_map()
{
var chunk = TFChunk.CreateNew(GetFilePathFor("afile"), 200, 0, 0, isScavenged: true, inMem: false, unbuffered: false, writethrough: false);
chunk.CompleteScavenge(new [] {new PosMap(0, 0), new PosMap(1,1) }, false);
using (var reader = chunk.AcquireRawReader())
{
var buffer = new byte[1024];
var result = reader.ReadNextBytes(1024, buffer);
Assert.IsFalse(result.IsEOF);
Assert.AreEqual(ChunkHeader.Size + ChunkHeader.Size + 2 * PosMap.FullSize, result.BytesRead);
}
chunk.MarkForDeletion();
chunk.WaitForDestroy(5000);
}

[Test]
public void a_read_past_end_of_completed_chunk_does_include_header_or_footer()
{
var chunk = TFChunk.CreateNew(GetFilePathFor("File1"), 300, 0, 0, isScavenged: false, inMem: false, unbuffered: false, writethrough: false);
chunk.Complete();
using (var reader = chunk.AcquireRawReader())
{
var buffer = new byte[1024];
var result = reader.ReadNextBytes(1024, buffer);
Assert.IsTrue(result.IsEOF);
Assert.AreEqual(ChunkHeader.Size + ChunkFooter.Size, result.BytesRead); //just header + footer = 256
}
chunk.MarkForDeletion();
chunk.WaitForDestroy(5000);
}
*/
[Test]
public void a_read_past_end_of_completed_chunk_does_include_header_or_footer()
{
var chunk = TFChunk.CreateNew(GetFilePathFor("File1"), 300, 0, 0, isScavenged: false, inMem: false, unbuffered: false, writethrough: false);
chunk.Complete();
using (var reader = chunk.AcquireRawReader())
{
var buffer = new byte[1024];
var result = reader.ReadNextBytes(1024, buffer);
Assert.IsTrue(result.IsEOF);
Assert.AreEqual(ChunkHeader.Size + ChunkFooter.Size, result.BytesRead); //just header + footer = 256
}
chunk.MarkForDeletion();
chunk.WaitForDestroy(5000);
}
*/

[Test]
public async Task if_asked_for_more_than_buffer_size_will_only_read_buffer_size()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public async Task a_record_can_be_written()
await using var filestream = File.Open(filename,
new FileStreamOptions
{
Mode = FileMode.Open, Access = FileAccess.Read, Options = FileOptions.Asynchronous
Mode = FileMode.Open,
Access = FileAccess.Read,
Options = FileOptions.Asynchronous
});
filestream.Seek(ChunkHeader.Size + 137 + sizeof(int), SeekOrigin.Begin);
var recordLength = filestream.Length - filestream.Position;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public async Task a_record_can_be_written()
await using var filestream = File.Open(filename,
new FileStreamOptions
{
Mode = FileMode.Open, Access = FileAccess.Read, Options = FileOptions.Asynchronous
Mode = FileMode.Open,
Access = FileAccess.Read,
Options = FileOptions.Asynchronous
});
filestream.Seek(ChunkHeader.Size + 137 + sizeof(int), SeekOrigin.Begin);
var recordLength = filestream.Length - filestream.Position;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public async Task a_record_is_not_written_at_first_but_written_on_second_try()
await using var filestream = File.Open(filename2,
new FileStreamOptions
{
Mode = FileMode.Open, Access = FileAccess.Read, Options = FileOptions.Asynchronous
Mode = FileMode.Open,
Access = FileAccess.Read,
Options = FileOptions.Asynchronous
});
filestream.Seek(ChunkHeader.Size + sizeof(int), SeekOrigin.Begin);
var recordLength = filestream.Length - filestream.Position;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ public async Task iterates_chunks_with_correct_callback_order()
var strategy = new VersionedPatternFileNamingStrategy(PathName, "chunk-");
var chunkEnumerator = new TFChunkEnumerator(strategy);
var result = new List<string>();
ValueTask<int> GetNextFileNumber(string chunk, int chunkNumber, int chunkVersion, CancellationToken token) {
return Path.GetFileName(chunk) switch {
ValueTask<int> GetNextFileNumber(string chunk, int chunkNumber, int chunkVersion, CancellationToken token)
{
return Path.GetFileName(chunk) switch
{
"chunk-000001.000000" => new(2),
"chunk-000002.000001" => new(3),
"chunk-000005.000000" => new(6),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public async ValueTask<int> WriteFooter(ReadOnlyMemory<byte> footer, Cancellatio

private static int GetAlignedSize(int size, int alignmentSize)
{
if (size % alignmentSize == 0) return size;
if (size % alignmentSize == 0)
return size;
return (size / alignmentSize + 1) * alignmentSize;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using EventStore.Plugins.Transforms;

namespace EventStore.Core.Tests.Transforms.ByteDup;

public class ByteDupChunkReadTransform : IChunkReadTransform
{
public ChunkDataReadStream TransformData(ChunkDataReadStream dataStream) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using EventStore.Plugins.Transforms;

namespace EventStore.Core.Tests.Transforms.ByteDup;

public class ByteDupChunkTransform : IChunkTransform
{
public IChunkReadTransform Read { get; } = new ByteDupChunkReadTransform();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public async ValueTask<int> WriteFooter(ReadOnlyMemory<byte> footer, Cancellatio

private static int GetAlignedSize(int size, int alignmentSize)
{
if (size % alignmentSize == 0) return size;
if (size % alignmentSize == 0)
return size;
return (size / alignmentSize + 1) * alignmentSize;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using EventStore.Plugins.Transforms;

namespace EventStore.Core.Tests.Transforms.ByteDup;

public class ByteDupDbTransform : IDbTransform
{
public string Name => "bytedup";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using EventStore.Plugins.Transforms;

namespace EventStore.Core.Tests.Transforms.WithHeader;

public class WithHeaderChunkReadTransform(int transformHeaderSize) : IChunkReadTransform
{
public ChunkDataReadStream TransformData(ChunkDataReadStream dataStream) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public async ValueTask<int> WriteFooter(ReadOnlyMemory<byte> footer, Cancellatio

private static int GetAlignedSize(int size, int alignmentSize)
{
if (size % alignmentSize == 0) return size;
if (size % alignmentSize == 0)
return size;
return (size / alignmentSize + 1) * alignmentSize;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using EventStore.Plugins.Transforms;

namespace EventStore.Core.Tests.Transforms.WithHeader;

public class WithHeaderDbTransform : IDbTransform
{
public string Name => "withheader";
Expand Down