Skip to content

Commit 9e5cc2d

Browse files
committed
Added consumer rails
1 parent 30975fd commit 9e5cc2d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+430
-111
lines changed

.DS_Store

4 KB
Binary file not shown.

AsyncMonolith.Ef/.DS_Store

6 KB
Binary file not shown.

AsyncMonolith.Ef/AsyncMonolith.Ef.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<LangVersion>preview</LangVersion>
99
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
1010
<PackageId>AsyncMonolith.Ef</PackageId>
11-
<PackageVersion>9.0.0</PackageVersion>
11+
<PackageVersion>9.0.2</PackageVersion>
1212
<Authors>Tim Jones</Authors>
1313
<Company>Aptacode</Company>
1414
<Description>Entity Framework interface for AsyncMonolith</Description>

AsyncMonolith.Ef/EfConsumerMessageFetcher.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ public EfConsumerMessageFetcher(IOptions<AsyncMonolithSettings> options)
2626
/// </summary>
2727
/// <param name="consumerSet">The DbSet of consumer messages.</param>
2828
/// <param name="currentTime">The current time.</param>
29+
/// <param name="railId">The message rail to fetch messages from.</param>
2930
/// <param name="cancellationToken">The cancellation token.</param>
3031
/// <returns>A task that represents the asynchronous operation. The task result contains a list of consumer messages.</returns>
31-
public Task<List<ConsumerMessage>> Fetch(DbSet<ConsumerMessage> consumerSet, long currentTime,
32+
public Task<List<ConsumerMessage>> Fetch(DbSet<ConsumerMessage> consumerSet, long currentTime, int railId,
3233
CancellationToken cancellationToken = default)
3334
{
3435
return consumerSet
35-
.Where(m => m.AvailableAfter <= currentTime)
36+
.Where(m => m.AvailableAfter <= currentTime && m.RailId == railId)
3637
.OrderBy(m => m.CreatedAt)
3738
.Take(_options.Value.ProcessorBatchSize)
3839
.ToListAsync(cancellationToken);

AsyncMonolith.Ef/EfProducerService.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,21 @@ public Task Produce<TK>(TK message, long? availableAfter = null, string? insertI
5757
var traceId = Activity.Current?.TraceId.ToString();
5858
var spanId = Activity.Current?.SpanId.ToString();
5959

60-
foreach (var consumerId in _consumerRegistry.ResolvePayloadConsumerTypes(payloadType))
60+
foreach (var consumerType in _consumerRegistry.ResolvePayloadConsumerTypes(payloadType))
6161
{
6262
set.Add(new ConsumerMessage
6363
{
6464
Id = _idGenerator.GenerateId(),
6565
CreatedAt = currentTime,
6666
AvailableAfter = availableAfter.Value,
67-
ConsumerType = consumerId,
67+
ConsumerType = consumerType,
6868
PayloadType = payloadType,
6969
Payload = payload,
7070
Attempts = 0,
7171
InsertId = insertId,
7272
TraceId = traceId,
73-
SpanId = spanId
73+
SpanId = spanId,
74+
RailId = _consumerRegistry.ResolveConsumerRailId(consumerType)
7475
});
7576
}
7677

@@ -115,7 +116,8 @@ public Task ProduceList<TK>(List<TK> messages, long? availableAfter = null,
115116
Attempts = 0,
116117
InsertId = insertId,
117118
TraceId = traceId,
118-
SpanId = spanId
119+
SpanId = spanId,
120+
RailId = _consumerRegistry.ResolveConsumerRailId(consumerId)
119121
});
120122
}
121123
}
@@ -145,7 +147,8 @@ public void Produce(ScheduledMessage message)
145147
Attempts = 0,
146148
InsertId = insertId,
147149
TraceId = null,
148-
SpanId = null
150+
SpanId = null,
151+
RailId = _consumerRegistry.ResolveConsumerRailId(consumerId)
149152
});
150153
}
151154
}

AsyncMonolith.Ef/StartupExtensions.cs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,6 @@ private static IServiceCollection AddEfAsyncMonolith<T>(
5252
{
5353
configuration(settings);
5454

55-
if (settings.ConsumerMessageProcessorCount > 1)
56-
{
57-
throw new ArgumentException(
58-
"AsyncMonolithSettings.ConsumerMessageProcessorCount can only be set to 1 when using 'DbType.Ef'.");
59-
}
60-
61-
if (settings.ScheduledMessageProcessorCount > 1)
62-
{
63-
throw new ArgumentException(
64-
"AsyncMonolithSettings.ScheduledMessageProcessorCount can only be set to 1 when using 'DbType.Ef'.");
65-
}
66-
6755
services.InternalAddAsyncMonolith<T>(settings);
6856
services.AddScoped<IProducerService, EfProducerService<T>>();
6957
services.AddSingleton<IConsumerMessageFetcher, EfConsumerMessageFetcher>();

AsyncMonolith.MariaDb/.DS_Store

6 KB
Binary file not shown.

AsyncMonolith.MariaDb/AsyncMonolith.MariaDb.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<LangVersion>preview</LangVersion>
1010
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
1111
<PackageId>AsyncMonolith.MariaDb</PackageId>
12-
<PackageVersion>9.0.0</PackageVersion>
12+
<PackageVersion>9.0.2</PackageVersion>
1313
<Authors>Tim Jones</Authors>
1414
<Company>Aptacode</Company>
1515
<Description>MariaDb interface for AsyncMonolith</Description>

AsyncMonolith.MariaDb/MariaDbConsumerMessageFetcher.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public sealed class MariaDbConsumerMessageFetcher : IConsumerMessageFetcher
1414
private const string MariaDb = @"
1515
SELECT *
1616
FROM consumer_messages
17-
WHERE available_after <= @currentTime
17+
WHERE available_after <= @currentTime AND rail_id = @railId
1818
ORDER BY created_at
1919
LIMIT @batchSize
2020
FOR UPDATE SKIP LOCKED";
@@ -35,13 +35,15 @@ public MariaDbConsumerMessageFetcher(IOptions<AsyncMonolithSettings> options)
3535
/// </summary>
3636
/// <param name="consumerSet">The DbSet of consumer messages.</param>
3737
/// <param name="currentTime">The current time.</param>
38+
/// <param name="railId">The message rail to fetch messages from.</param>
3839
/// <param name="cancellationToken">The cancellation token.</param>
3940
/// <returns>A task that represents the asynchronous operation. The task result contains a list of consumer messages.</returns>
40-
public Task<List<ConsumerMessage>> Fetch(DbSet<ConsumerMessage> consumerSet, long currentTime,
41+
public Task<List<ConsumerMessage>> Fetch(DbSet<ConsumerMessage> consumerSet, long currentTime, int railId,
4142
CancellationToken cancellationToken = default)
4243
{
4344
return consumerSet
4445
.FromSqlRaw(MariaDb, new MySqlParameter("@currentTime", currentTime),
46+
new MySqlParameter("@railId", railId),
4547
new MySqlParameter("@batchSize", _options.Value.ProcessorBatchSize))
4648
.ToListAsync(cancellationToken);
4749
}

AsyncMonolith.MariaDb/MariaDbProducerService.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public async Task Produce<TK>(TK message, long? availableAfter = null, string? i
7171
};
7272

7373
var consumerTypes = _consumerRegistry.ResolvePayloadConsumerTypes(payloadType);
74+
var consumerRailIds = _consumerRegistry.ResolveConsumerRailIds(consumerTypes);
7475
for (var index = 0; index < consumerTypes.Count; index++)
7576
{
7677
if (sqlBuilder.Length > 0)
@@ -79,14 +80,16 @@ public async Task Produce<TK>(TK message, long? availableAfter = null, string? i
7980
}
8081

8182
sqlBuilder.Append(
82-
$@"(@id_{index}, @created_at, @available_after, 0, @consumer_type_{index}, @payload_type, @payload, @insert_id, @trace_id, @span_id)");
83+
$@"(@id_{index}, @created_at, @available_after, 0, @consumer_type_{index}, @payload_type, @payload, @insert_id, @trace_id, @span_id, @rail_{index})");
8384

8485
parameters.Add(new MySqlParameter($"@id_{index}", _idGenerator.GenerateId()));
8586
parameters.Add(new MySqlParameter($"@consumer_type_{index}", consumerTypes[index]));
87+
parameters.Add(new MySqlParameter($"@rail_{index}", consumerRailIds[index]));
88+
8689
}
8790

8891
var sql = $@"
89-
INSERT INTO consumer_messages (id, created_at, available_after, attempts, consumer_type, payload_type, payload, insert_id, trace_id, span_id)
92+
INSERT INTO consumer_messages (id, created_at, available_after, attempts, consumer_type, payload_type, payload, insert_id, trace_id, span_id, rail_id)
9093
VALUES {sqlBuilder}
9194
ON DUPLICATE KEY UPDATE id = id;";
9295

@@ -120,7 +123,7 @@ public async Task ProduceList<TK>(List<TK> messages, long? availableAfter = null
120123

121124
var payloadType = typeof(TK).Name;
122125
var consumerTypes = _consumerRegistry.ResolvePayloadConsumerTypes(payloadType);
123-
126+
var consumerRailIds = _consumerRegistry.ResolveConsumerRailIds(consumerTypes);
124127
for (var i = 0; i < messages.Count; i++)
125128
{
126129
var message = messages[i];
@@ -138,15 +141,16 @@ public async Task ProduceList<TK>(List<TK> messages, long? availableAfter = null
138141
}
139142

140143
sqlBuilder.Append(
141-
$@"(@id_{i}_{index}, @created_at, @available_after, 0, @consumer_type_{i}_{index}, @payload_type_{i}, @payload_{i}, @insert_id_{i}, @trace_id, @span_id)");
144+
$@"(@id_{i}_{index}, @created_at, @available_after, 0, @consumer_type_{i}_{index}, @payload_type_{i}, @payload_{i}, @insert_id_{i}, @trace_id, @span_id, @rail_{i}_{index})");
142145

143146
parameters.Add(new MySqlParameter($"@id_{i}_{index}", _idGenerator.GenerateId()));
144147
parameters.Add(new MySqlParameter($"@consumer_type_{i}_{index}", consumerTypes[index]));
148+
parameters.Add(new MySqlParameter($"@rail_{i}_{index}", consumerRailIds[index]));
145149
}
146150
}
147151

148152
var sql = $@"
149-
INSERT INTO consumer_messages (id, created_at, available_after, attempts, consumer_type, payload_type, payload, insert_id, trace_id, span_id)
153+
INSERT INTO consumer_messages (id, created_at, available_after, attempts, consumer_type, payload_type, payload, insert_id, trace_id, span_id, rail_id)
150154
VALUES {sqlBuilder}
151155
ON DUPLICATE KEY UPDATE id = id;";
152156

@@ -175,7 +179,8 @@ public void Produce(ScheduledMessage message)
175179
Attempts = 0,
176180
InsertId = insertId,
177181
TraceId = null,
178-
SpanId = null
182+
SpanId = null,
183+
RailId = _consumerRegistry.ResolveConsumerRailId(consumerId)
179184
});
180185
}
181186
}

0 commit comments

Comments
 (0)