Skip to content

Commit 0c9a15b

Browse files
committed
Added locking when watching subscriptions.
1 parent f35ab87 commit 0c9a15b

6 files changed

Lines changed: 94 additions & 32 deletions

File tree

src/AppCoreNet.EventStore.SqlServer/DependencyInjection/SqlServerEventStoreBuilderExtensions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
using Microsoft.EntityFrameworkCore;
1313
using Microsoft.Extensions.DependencyInjection;
1414
using Microsoft.Extensions.DependencyInjection.Extensions;
15+
using Microsoft.Extensions.Options;
1516

1617
// ReSharper disable once CheckNamespace
1718
namespace AppCoreNet.Extensions.DependencyInjection;
1819

20+
/// <summary>
21+
/// Provides extension methods to register SQL Server event store with the DI container.
22+
/// </summary>
1923
public static class SqlServerEventStoreBuilderExtensions
2024
{
2125
private static void AddSqlServer<TDbContext>(
@@ -47,6 +51,7 @@ private static void AddSqlServer<TDbContext>(
4751

4852
services.TryAddEnumerable(
4953
[
54+
ServiceDescriptor.Transient<IPostConfigureOptions<SqlServerEventStoreOptions>, SqlServerConfigureEventStoreOptions>(),
5055
ServiceDescriptor.Transient<IEventStore, SqlServerEventStore<TDbContext>>(),
5156
ServiceDescriptor.Transient<ISubscriptionStore, SqlServerSubscriptionStore<TDbContext>>(),
5257
]);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Licensed under the MIT license.
2+
// Copyright (c) The AppCore .NET project.
3+
4+
using Microsoft.Extensions.Hosting;
5+
using Microsoft.Extensions.Options;
6+
7+
namespace AppCoreNet.EventStore.SqlServer;
8+
9+
internal sealed class SqlServerConfigureEventStoreOptions : IPostConfigureOptions<SqlServerEventStoreOptions>
10+
{
11+
private readonly IHostEnvironment _environment;
12+
13+
public SqlServerConfigureEventStoreOptions(IHostEnvironment environment)
14+
{
15+
_environment = environment;
16+
}
17+
18+
public void PostConfigure(string name, SqlServerEventStoreOptions options)
19+
{
20+
if (string.IsNullOrEmpty(options.ApplicationName))
21+
{
22+
options.ApplicationName = _environment.ApplicationName;
23+
}
24+
}
25+
}

src/AppCoreNet.EventStore.SqlServer/SqlServerEventStoreOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,13 @@ public class SqlServerEventStoreOptions
1818
/// <summary>
1919
/// Gets or sets the name of the event store database schema.
2020
/// </summary>
21+
/// <remarks>
22+
/// If <c>null</c> the schema defaults to <c>dbo</c>.
23+
/// </remarks>
2124
public string? SchemaName { get; set; }
25+
26+
/// <summary>
27+
/// Gets or sets the application name.
28+
/// </summary>
29+
public string? ApplicationName { get; set; }
2230
}

src/AppCoreNet.EventStore.SqlServer/Subscriptions/SqlServerSubscriptionStore.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ public async IAsyncEnumerable<Subscription> GetAllAsync(
5151
.AsNoTracking()
5252
.AsAsyncEnumerable();
5353

54-
await foreach (Model.EventSubscription subscription in enumerable.WithCancellation(cancellationToken)
55-
.ConfigureAwait(false))
54+
await foreach (Model.EventSubscription subscription in
55+
enumerable.WithCancellation(cancellationToken)
56+
.ConfigureAwait(false))
5657
{
5758
yield return new Subscription(
5859
subscription.SubscriptionId,
@@ -122,6 +123,7 @@ public async Task DeleteAsync(SubscriptionId subscriptionId, CancellationToken c
122123
{
123124
PollInterval = _options.PollInterval,
124125
Timeout = timeout,
126+
LockResource = _options.ApplicationName + "-WatchSubscriptions",
125127
};
126128

127129
Model.WatchSubscriptionsResult result =

src/AppCoreNet.EventStore.SqlServer/Subscriptions/WatchSubscriptionsStoredProcedure.cs

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ internal sealed class WatchSubscriptionsStoredProcedure : SqlStoredProcedure<Mod
1717

1818
required public TimeSpan Timeout { get; init; }
1919

20+
required public string LockResource { get; init; }
21+
2022
public WatchSubscriptionsStoredProcedure(DbContext dbContext, string? schema)
2123
: base(dbContext, $"[{SchemaUtils.GetEventStoreSchema(schema)}].{ProcedureName}")
2224
{
@@ -43,7 +45,8 @@ public static string GetCreateScript(string? schema)
4345
return $"""
4446
CREATE PROCEDURE [{schema}].{ProcedureName} (
4547
@{nameof(PollInterval)} INT,
46-
@{nameof(Timeout)} INT
48+
@{nameof(Timeout)} INT,
49+
@{nameof(LockResource)} NVARCHAR(max)
4750
)
4851
AS
4952
BEGIN
@@ -52,41 +55,52 @@ public static string GetCreateScript(string? schema)
5255
DECLARE @StreamId AS NVARCHAR({Constants.StreamIdMaxLength});
5356
DECLARE @Position AS BIGINT;
5457
DECLARE @WaitTime AS VARCHAR(12) = CONVERT(VARCHAR(12), DATEADD(ms, @PollInterval, 0), 114);
58+
DECLARE @LockTime AS DATETIME;
59+
DECLARE @LockResult AS INT;
60+
61+
SELECT @LockTime = CURRENT_TIMESTAMP;
62+
EXEC @LockResult = sp_getapplock @Resource = @{nameof(LockResource)}, @LockMode = 'Exclusive', @LockTimeout = @Timeout;
63+
SET @Timeout = @Timeout - DATEDIFF(MILLISECOND, @LockTime, CURRENT_TIMESTAMP);
5564
56-
WHILE @Id IS NULL
65+
IF @LockResult >= 0
5766
BEGIN
58-
SELECT TOP 1
59-
@Id = SU.[{nameof(Model.EventSubscription.Id)}],
60-
@SubscriptionId = SU.[{nameof(Model.EventSubscription.SubscriptionId)}],
61-
@Position = SU.[{nameof(Model.EventSubscription.Position)}],
62-
@StreamId = SU.[{nameof(Model.EventSubscription.StreamId)}]
63-
FROM
64-
[{schema}].[{nameof(Model.EventSubscription)}] AS SU WITH (UPDLOCK, ROWLOCK, READPAST),
65-
[{schema}].[{nameof(Model.EventStream)}] AS ST
66-
WHERE
67-
(ST.[{nameof(Model.EventStream.StreamId)}] = SU.[{nameof(Model.EventSubscription.StreamId)}]
68-
AND SU.[{nameof(Model.EventSubscription.Position)}] < ST.[{nameof(Model.EventStream.Index)}])
69-
OR (SU.[{nameof(Model.EventSubscription.StreamId)}] = '{Constants.StreamIdAll}'
70-
AND SU.[{nameof(Model.EventSubscription.Position)}] < ST.[{nameof(Model.EventStream.Sequence)}])
71-
OR (LEFT(SU.[{nameof(Model.EventSubscription.StreamId)}], 1) = '*'
72-
AND ST.[{nameof(Model.EventStream.StreamId)}] LIKE '%' + RIGHT(SU.[{nameof(Model.EventSubscription.StreamId)}], LEN(SU.[{nameof(Model.EventSubscription.StreamId)}])-1)
73-
AND SU.[{nameof(Model.EventSubscription.Position)}] < ST.[{nameof(Model.EventStream.Sequence)}])
74-
OR (RIGHT(SU.[{nameof(Model.EventSubscription.StreamId)}], 1) = '*'
75-
AND ST.[{nameof(Model.EventStream.StreamId)}] LIKE LEFT(SU.[{nameof(Model.EventSubscription.StreamId)}], LEN(SU.[{nameof(Model.EventSubscription.StreamId)}])-1) + '%'
76-
AND SU.[{nameof(Model.EventSubscription.Position)}] < ST.[{nameof(Model.EventStream.Sequence)}])
77-
ORDER BY
78-
SU.[{nameof(Model.EventSubscription.ProcessedAt)}],
79-
ST.[{nameof(Model.EventStream.Sequence)}]
80-
DESC;
81-
82-
IF @Id IS NULL
67+
WHILE @Id IS NULL
8368
BEGIN
84-
WAITFOR DELAY @WaitTime;
85-
SET @{nameof(Timeout)} = @{nameof(Timeout)} - @{nameof(PollInterval)};
86-
IF @{nameof(Timeout)} <= 0 BREAK;
69+
SELECT TOP 1
70+
@Id = SU.[{nameof(Model.EventSubscription.Id)}],
71+
@SubscriptionId = SU.[{nameof(Model.EventSubscription.SubscriptionId)}],
72+
@Position = SU.[{nameof(Model.EventSubscription.Position)}],
73+
@StreamId = SU.[{nameof(Model.EventSubscription.StreamId)}]
74+
FROM
75+
[{schema}].[{nameof(Model.EventSubscription)}] AS SU WITH (UPDLOCK, ROWLOCK, READPAST),
76+
[{schema}].[{nameof(Model.EventStream)}] AS ST
77+
WHERE
78+
(ST.[{nameof(Model.EventStream.StreamId)}] = SU.[{nameof(Model.EventSubscription.StreamId)}]
79+
AND SU.[{nameof(Model.EventSubscription.Position)}] < ST.[{nameof(Model.EventStream.Index)}])
80+
OR (SU.[{nameof(Model.EventSubscription.StreamId)}] = '{Constants.StreamIdAll}'
81+
AND SU.[{nameof(Model.EventSubscription.Position)}] < ST.[{nameof(Model.EventStream.Sequence)}])
82+
OR (LEFT(SU.[{nameof(Model.EventSubscription.StreamId)}], 1) = '*'
83+
AND ST.[{nameof(Model.EventStream.StreamId)}] LIKE '%' + RIGHT(SU.[{nameof(Model.EventSubscription.StreamId)}], LEN(SU.[{nameof(Model.EventSubscription.StreamId)}])-1)
84+
AND SU.[{nameof(Model.EventSubscription.Position)}] < ST.[{nameof(Model.EventStream.Sequence)}])
85+
OR (RIGHT(SU.[{nameof(Model.EventSubscription.StreamId)}], 1) = '*'
86+
AND ST.[{nameof(Model.EventStream.StreamId)}] LIKE LEFT(SU.[{nameof(Model.EventSubscription.StreamId)}], LEN(SU.[{nameof(Model.EventSubscription.StreamId)}])-1) + '%'
87+
AND SU.[{nameof(Model.EventSubscription.Position)}] < ST.[{nameof(Model.EventStream.Sequence)}])
88+
ORDER BY
89+
SU.[{nameof(Model.EventSubscription.ProcessedAt)}],
90+
ST.[{nameof(Model.EventStream.Sequence)}]
91+
DESC;
92+
93+
IF @Id IS NULL
94+
BEGIN
95+
WAITFOR DELAY @WaitTime;
96+
SET @{nameof(Timeout)} = @{nameof(Timeout)} - @{nameof(PollInterval)};
97+
IF @{nameof(Timeout)} <= 0 BREAK;
98+
END
8799
END
88100
END
89101
102+
EXEC sp_releaseapplock @Resource = @{nameof(LockResource)}
103+
90104
SELECT
91105
@Id AS [{nameof(Model.WatchSubscriptionsResult.Id)}],
92106
@SubscriptionId AS [{nameof(Model.WatchSubscriptionsResult.SubscriptionId)}],
@@ -107,6 +121,7 @@ protected override SqlParameter[] GetParameters()
107121
[
108122
new SqlParameter($"@{nameof(PollInterval)}", (int)PollInterval.TotalMilliseconds),
109123
new SqlParameter($"@{nameof(Timeout)}", (int)Timeout.TotalMilliseconds),
124+
new SqlParameter($"@{nameof(LockResource)}", LockResource),
110125
];
111126
}
112127
}

test/AppCoreNet.EventStore.SpecificationTests/Subscriptions/SubscriptionStoreTests.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
using AppCoreNet.Extensions.DependencyInjection;
99
using FluentAssertions;
1010
using Microsoft.Extensions.DependencyInjection;
11+
using Microsoft.Extensions.Hosting;
12+
using NSubstitute;
1113
using Xunit;
1214

1315
namespace AppCoreNet.EventStore.Subscriptions;
@@ -23,6 +25,11 @@ protected ServiceProvider CreateServiceProvider()
2325

2426
protected virtual void ConfigureServices(IServiceCollection services)
2527
{
28+
var hostEnvironment = Substitute.For<IHostEnvironment>();
29+
hostEnvironment.ApplicationName.Returns(typeof(SubscriptionStoreTests).Assembly.FullName);
30+
31+
services.AddTransient<IHostEnvironment>(_ => hostEnvironment);
32+
2633
services.AddEventStore()
2734
.AddJsonSerializer(
2835
o =>

0 commit comments

Comments
 (0)