diff --git a/build/tasks.ps1 b/build/tasks.ps1 index e49b334..4649172 100644 --- a/build/tasks.ps1 +++ b/build/tasks.ps1 @@ -21,7 +21,6 @@ $projects = @( "Sa.Partitional.PostgreSql", - "Sa.Outbox.Support", "Sa.Outbox", "Sa.Outbox.PostgreSql", diff --git a/src/Sa.Configuration.PostgreSql/Sa.Configuration.PostgreSql.csproj b/src/Sa.Configuration.PostgreSql/Sa.Configuration.PostgreSql.csproj index be5c68c..dcb1592 100644 --- a/src/Sa.Configuration.PostgreSql/Sa.Configuration.PostgreSql.csproj +++ b/src/Sa.Configuration.PostgreSql/Sa.Configuration.PostgreSql.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 add a PostgreSQL-based configuration source to IConfiguration diff --git a/src/Sa.Configuration/Sa.Configuration.csproj b/src/Sa.Configuration/Sa.Configuration.csproj index fde7ec7..d23aefe 100644 --- a/src/Sa.Configuration/Sa.Configuration.csproj +++ b/src/Sa.Configuration/Sa.Configuration.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 extensions for Configuration diff --git a/src/Sa.Data.PostgreSql/Sa.Data.PostgreSql.csproj b/src/Sa.Data.PostgreSql/Sa.Data.PostgreSql.csproj index ba90fa9..f6ea1da 100644 --- a/src/Sa.Data.PostgreSql/Sa.Data.PostgreSql.csproj +++ b/src/Sa.Data.PostgreSql/Sa.Data.PostgreSql.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 Simple client for Npqsql diff --git a/src/Sa.Data.S3/Sa.Data.S3.csproj b/src/Sa.Data.S3/Sa.Data.S3.csproj index 9acb00e..bb3f1e3 100644 --- a/src/Sa.Data.S3/Sa.Data.S3.csproj +++ b/src/Sa.Data.S3/Sa.Data.S3.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 Sa.Data.S3 Simple client for S3 (Sa.Data.S3) s3 diff --git a/src/Sa.HybridFileStorage.FileSystem/Sa.HybridFileStorage.FileSystem.csproj b/src/Sa.HybridFileStorage.FileSystem/Sa.HybridFileStorage.FileSystem.csproj index dd44d7c..05cf062 100644 --- a/src/Sa.HybridFileStorage.FileSystem/Sa.HybridFileStorage.FileSystem.csproj +++ b/src/Sa.HybridFileStorage.FileSystem/Sa.HybridFileStorage.FileSystem.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 File storage management diff --git a/src/Sa.HybridFileStorage.Postgres/Sa.HybridFileStorage.Postgres.csproj b/src/Sa.HybridFileStorage.Postgres/Sa.HybridFileStorage.Postgres.csproj index fa2601e..65bc17c 100644 --- a/src/Sa.HybridFileStorage.Postgres/Sa.HybridFileStorage.Postgres.csproj +++ b/src/Sa.HybridFileStorage.Postgres/Sa.HybridFileStorage.Postgres.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 File storage management in Pg diff --git a/src/Sa.HybridFileStorage.S3/Sa.HybridFileStorage.S3.csproj b/src/Sa.HybridFileStorage.S3/Sa.HybridFileStorage.S3.csproj index e3a36ac..c372e71 100644 --- a/src/Sa.HybridFileStorage.S3/Sa.HybridFileStorage.S3.csproj +++ b/src/Sa.HybridFileStorage.S3/Sa.HybridFileStorage.S3.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 File storage management in S3 diff --git a/src/Sa.HybridFileStorage/Sa.HybridFileStorage.csproj b/src/Sa.HybridFileStorage/Sa.HybridFileStorage.csproj index 1b4f98c..0891e73 100644 --- a/src/Sa.HybridFileStorage/Sa.HybridFileStorage.csproj +++ b/src/Sa.HybridFileStorage/Sa.HybridFileStorage.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 File storage management diff --git a/src/Sa.Media.FFmpeg/Sa.Media.FFmpeg.csproj b/src/Sa.Media.FFmpeg/Sa.Media.FFmpeg.csproj index a1e3263..441b8fa 100644 --- a/src/Sa.Media.FFmpeg/Sa.Media.FFmpeg.csproj +++ b/src/Sa.Media.FFmpeg/Sa.Media.FFmpeg.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 FFmpeg wrapper diff --git a/src/Sa.Media/Sa.Media.csproj b/src/Sa.Media/Sa.Media.csproj index 93eae12..024692f 100644 --- a/src/Sa.Media/Sa.Media.csproj +++ b/src/Sa.Media/Sa.Media.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 Async and memory-efficient WAV file reader for .NET diff --git a/src/Sa.Outbox.PostgreSql/Commands/NpgsqlCommandExtension.cs b/src/Sa.Outbox.PostgreSql/Commands/NpgsqlCommandExtension.cs index c51f6c9..55c5079 100644 --- a/src/Sa.Outbox.PostgreSql/Commands/NpgsqlCommandExtension.cs +++ b/src/Sa.Outbox.PostgreSql/Commands/NpgsqlCommandExtension.cs @@ -1,5 +1,6 @@ using Npgsql; using Sa.Data.PostgreSql; +using Sa.Outbox.Delivery; using Sa.Outbox.PostgreSql.SqlBuilder; namespace Sa.Outbox.PostgreSql.Commands; diff --git a/src/Sa.Outbox.PostgreSql/Configuration/OutboxMessageSerializer.cs b/src/Sa.Outbox.PostgreSql/Configuration/OutboxMessageSerializer.cs new file mode 100644 index 0000000..d426713 --- /dev/null +++ b/src/Sa.Outbox.PostgreSql/Configuration/OutboxMessageSerializer.cs @@ -0,0 +1,18 @@ +using Sa.Outbox.PostgreSql.Serialization; +using System.Text.Json; + +namespace Sa.Outbox.PostgreSql.Configuration; + +internal sealed class OutboxMessageSerializer : IOutboxMessageSerializer +{ +#pragma warning disable IL2026 +#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. + public T? Deserialize(Stream stream) => JsonSerializer.Deserialize(stream); + + + public void Serialize(Stream stream, T value) => JsonSerializer.Serialize(stream, value); +#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. +#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code + + public static OutboxMessageSerializer Instance { get; } = new(); +} diff --git a/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConfiguration.cs b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConfiguration.cs index 97438a8..3099bb3 100644 --- a/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConfiguration.cs +++ b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConfiguration.cs @@ -1,5 +1,4 @@ -using System.Collections.Concurrent; -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Sa.Data.PostgreSql; using Sa.Outbox.PostgreSql.Serialization; @@ -8,40 +7,15 @@ namespace Sa.Outbox.PostgreSql.Configuration; internal sealed class PgOutboxConfiguration(IServiceCollection services) : IPgOutboxConfiguration { - private static readonly ConcurrentDictionary< - IServiceCollection, HashSet>> s_invokers = []; - public IPgOutboxConfiguration WithOutboxSettings(Action? configure = null) { if (configure != null) { - if (s_invokers.TryGetValue(services, out var invokers)) - { - invokers.Add(configure); - } - else - { - s_invokers[services] = [configure]; - } + services.AddSingleton(configure); } + RegisterOutboxSettings(); - services.TryAddSingleton(sp => - { - PgOutboxSettings settings = new(); - - if (s_invokers.TryGetValue(services, out var invokers)) - { - foreach (Action build in invokers) - build.Invoke(sp, settings); - - s_invokers.Remove(services, out _); - } - - return settings; - }); - - AddSettings(); return this; } @@ -53,6 +27,7 @@ public IPgOutboxConfiguration WithDataSource(Action messageSerializerFactory) { + services.RemoveAll(); services.TryAddSingleton(messageSerializerFactory); return this; } @@ -60,11 +35,35 @@ public IPgOutboxConfiguration WithMessageSerializer(Func(TService instance) where TService : class, IOutboxMessageSerializer { + services.RemoveAll(); services.TryAddSingleton(instance); return this; } - private void AddSettings() + internal IPgOutboxConfiguration WithDefaultSerializer() + { + services.TryAddSingleton(OutboxMessageSerializer.Instance); + return this; + } + + private void RegisterOutboxSettings() + { + services.TryAddSingleton(sp => + { + PgOutboxSettings settings = new(); + + var configureActions = sp.GetServices>(); + + foreach (var configureAction in configureActions) + configureAction.Invoke(sp, settings); + + return settings; + }); + + RegisterComponentSettings(); + } + + private void RegisterComponentSettings() { services.TryAddSingleton(sp => sp.GetRequiredService().TableSettings); services.TryAddSingleton(sp => sp.GetRequiredService().MigrationSettings); diff --git a/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConsumeSettings.cs b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConsumeSettings.cs index b987cc3..2b19f98 100644 --- a/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConsumeSettings.cs +++ b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConsumeSettings.cs @@ -1,5 +1,6 @@ -using System.Diagnostics.CodeAnalysis; -using Sa.Extensions; +using Sa.Extensions; +using Sa.Outbox.Delivery; +using System.Diagnostics.CodeAnalysis; namespace Sa.Outbox.PostgreSql; diff --git a/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxTableSettings.cs b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxTableSettings.cs index ecfd01e..6c609ce 100644 --- a/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxTableSettings.cs +++ b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxTableSettings.cs @@ -120,7 +120,7 @@ public string[] All() => $"{MsgCreatedAt} BIGINT NOT NULL DEFAULT 0", $"{DeliveryId} BIGINT NOT NULL DEFAULT 0", $"{DeliveryAttempt} INT NOT NULL DEFAULT 0", - $"{DeliveryStatusCode} INT NOT NULL DEFAULT {(int)Outbox.DeliveryStatusCode.Pending}", + $"{DeliveryStatusCode} INT NOT NULL DEFAULT {(int)Outbox.Delivery.DeliveryStatusCode.Pending}", $"{DeliveryStatusMessage} TEXT NOT NULL DEFAULT ''", $"{DeliveryCreatedAt} BIGINT NOT NULL DEFAULT 0", $"{ErrorId} BIGINT NOT NULL DEFAULT 0", @@ -160,7 +160,7 @@ public sealed class TableFields public string[] All() => [ $"{DeliveryId} BIGSERIAL NOT NULL", - $"{DeliveryStatusCode} INT NOT NULL DEFAULT {(int)Outbox.DeliveryStatusCode.Pending}", + $"{DeliveryStatusCode} INT NOT NULL DEFAULT {(int)Outbox.Delivery.DeliveryStatusCode.Pending}", $"{DeliveryStatusMessage} TEXT NOT NULL DEFAULT ''", $"{MsgPayloadId} TEXT NOT NULL DEFAULT ''", $"{TenantId} INT NOT NULL DEFAULT 0", diff --git a/src/Sa.Outbox.PostgreSql/Configuration/Setup.cs b/src/Sa.Outbox.PostgreSql/Configuration/Setup.cs index 8ca16ec..045928a 100644 --- a/src/Sa.Outbox.PostgreSql/Configuration/Setup.cs +++ b/src/Sa.Outbox.PostgreSql/Configuration/Setup.cs @@ -8,13 +8,15 @@ public static IServiceCollection AddPgOutboxSettings( this IServiceCollection services, Action? configure = null) { - var cfg = new PgOutboxConfiguration(services); - configure?.Invoke(cfg); - - cfg + var configuration = new PgOutboxConfiguration(services) + .WithDefaultSerializer() .WithOutboxSettings() .WithDataSource() - ; + ; + + configure?.Invoke(configuration); + + return services; } diff --git a/src/Sa.Outbox.PostgreSql/Interceptors/DeliveryJobInterceptor.cs b/src/Sa.Outbox.PostgreSql/Interceptors/DeliveryJobInterceptor.cs index 1ce9272..d563950 100644 --- a/src/Sa.Outbox.PostgreSql/Interceptors/DeliveryJobInterceptor.cs +++ b/src/Sa.Outbox.PostgreSql/Interceptors/DeliveryJobInterceptor.cs @@ -1,4 +1,4 @@ -using Sa.Outbox.Job; +using Sa.Outbox.Delivery.Job; using Sa.Partitional.PostgreSql; using Sa.Schedule; diff --git a/src/Sa.Outbox.PostgreSql/Interceptors/Setup.cs b/src/Sa.Outbox.PostgreSql/Interceptors/Setup.cs index f51c5f3..67ec58b 100644 --- a/src/Sa.Outbox.PostgreSql/Interceptors/Setup.cs +++ b/src/Sa.Outbox.PostgreSql/Interceptors/Setup.cs @@ -1,6 +1,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; -using Sa.Outbox.Job; +using Sa.Outbox.Delivery.Job; namespace Sa.Outbox.PostgreSql.Interceptors; diff --git a/src/Sa.Outbox.PostgreSql/Readme.md b/src/Sa.Outbox.PostgreSql/Readme.md index 8742863..1629a33 100644 --- a/src/Sa.Outbox.PostgreSql/Readme.md +++ b/src/Sa.Outbox.PostgreSql/Readme.md @@ -15,23 +15,13 @@ dotnet add package Sa.Outbox.PostgreSql ConfigureServices(services => services // outbox .AddOutbox(builder => builder - .WithTenantSettings((_, ts) => ts.WithTenantIds(1, 2, 3)) - .WithDeliveries(builder => builder - .AddDelivery((_, settings) => - { - settings.ScheduleSettings.WithIntervalSeconds(5); - }) - ) + .WithTenants((_, ts) => ts.WithTenantIds(1, 2, 3)) + .WithDeliveries(b => b.AddDelivery()) ) // outbox pg .AddOutboxUsingPostgreSql(cfg => cfg .WithDataSource(ds => ds.WithConnectionString("Host=my_host;Database=my_db;Username=my_user;Password=my_password")) - .WithOutboxSettings((_, settings) => - { - settings.TableSettings.WithSchema("my_outbox"); - settings.ConsumeSettings.WithMinOffset(DateTimeOffset.Now); - }) - .WithMessageSerializer(...) + .WithOutboxSettings((_, settings) => settings.TableSettings.WithSchema("my_outbox")) ) ) ``` @@ -39,23 +29,16 @@ ConfigureServices(services => services ### Publishing Messages ```csharp -public sealed record MyMessage(string PayloadId, int TenantId = 0) : IOutboxPayloadMessage -{ - public static string PartName => "root"; -} +public sealed record MyMessage(string PayloadId); // Batch publishing for different tenants -await publisher.Publish([ - new MyMessage("#1", 1), - new MyMessage("#2", 2), - new MyMessage("#3", 3) -]); +await publisher.Publish([new MyMessage("#1"), new MyMessage("#2")], tenantId: 1); ``` ### Message Processing ```csharp -public class MyConsumer : IConsumer +sealed class MyConsumer : IConsumer { public async ValueTask Consume( ConsumerGroupSettings settings, @@ -72,37 +55,6 @@ public class MyConsumer : IConsumer } ``` -## Messages - -All messages for publication must implement the interface: - -```csharp -public interface IOutboxHasPart -{ - /// - /// Gets the logical identifier of the partition associated with this type. - /// - /// "orders", "notifications" - static abstract string PartName { get; } -} - -/// -/// This interface defines the properties that any Outbox payload message must implement. -/// -public interface IOutboxPayloadMessage : IOutboxHasPart -{ - /// - /// Gets the unique identifier for the payload. - /// - string PayloadId { get; } - - /// - /// Gets the identifier for the tenant associated with the payload. - /// - int TenantId { get; } -} -``` - ## Key Features ### 1. Multi-Consumer Support @@ -112,7 +64,7 @@ A single message type can be processed by multiple independent consumers. Each c The library is designed with data isolation between tenants in mind: ```csharp -.WithTenantSettings((_, ts) => ts +.WithTenants((_, ts) => ts .WithTenantIds(1, 2, 3) // Explicit tenant specification .WithAutoDetect() // Or automatic detection .WithTenantDetector() // Custom tenant detector @@ -133,20 +85,6 @@ The library is designed with data isolation between tenants in mind: - **`Aborted(string)`** - Skip with specified reason - e.t.c -### 5. Pull Model with Static and Dynamic Management -```csharp -// DI -settings.ConsumeSettings - .WithIntervalSeconds(5) - .WithMaxDeliveryAttempts(3) - .WithBatchingWindow(TimeSpan.FromMinutes(5)) - .WithLockDuration(TimeSpan.FromMinutes(10)); - - // Dynamic management during runtime - public async ValueTask Consume(ConsumerGroupSettings settings,... - settings.ConsumeSettings.WithMaxProcessingIterations(100); -``` - ## Database Architecture ### Table Structure @@ -198,4 +136,4 @@ settings.ConsumeSettings ## License -MIT \ No newline at end of file +MIT diff --git a/src/Sa.Outbox.PostgreSql/Sa.Outbox.PostgreSql.csproj b/src/Sa.Outbox.PostgreSql/Sa.Outbox.PostgreSql.csproj index 6e8009f..2396801 100644 --- a/src/Sa.Outbox.PostgreSql/Sa.Outbox.PostgreSql.csproj +++ b/src/Sa.Outbox.PostgreSql/Sa.Outbox.PostgreSql.csproj @@ -3,7 +3,7 @@ - 0.6.1 + 0.7.0 Simple Outbox for Pg (publishing and using messages) diff --git a/src/Sa.Outbox.PostgreSql/Servicers/IOutboxContextFactory.cs b/src/Sa.Outbox.PostgreSql/Servicers/IOutboxContextFactory.cs index 1a42c58..23624f9 100644 --- a/src/Sa.Outbox.PostgreSql/Servicers/IOutboxContextFactory.cs +++ b/src/Sa.Outbox.PostgreSql/Servicers/IOutboxContextFactory.cs @@ -1,4 +1,6 @@ -namespace Sa.Outbox.Repository; +using Sa.Outbox.Delivery; + +namespace Sa.Outbox.Repository; public interface IOutboxContextFactory { diff --git a/src/Sa.Outbox.PostgreSql/SqlBuilder/SqlOutboxBuilder.cs b/src/Sa.Outbox.PostgreSql/SqlBuilder/SqlOutboxBuilder.cs index 522e3df..bcf345b 100644 --- a/src/Sa.Outbox.PostgreSql/SqlBuilder/SqlOutboxBuilder.cs +++ b/src/Sa.Outbox.PostgreSql/SqlBuilder/SqlOutboxBuilder.cs @@ -1,5 +1,6 @@ using System.Text; using Microsoft.Extensions.ObjectPool; +using Sa.Outbox.Delivery; namespace Sa.Outbox.PostgreSql.SqlBuilder; diff --git a/src/Sa.Outbox.Support/IOutboxHasPart.cs b/src/Sa.Outbox.Support/IOutboxHasPart.cs deleted file mode 100644 index 26da161..0000000 --- a/src/Sa.Outbox.Support/IOutboxHasPart.cs +++ /dev/null @@ -1,39 +0,0 @@ -namespace Sa.Outbox.Support; - -/// -/// Represents an entity or message that is logically associated with a specific partition ("part"). -/// This allows routing messages to dedicated outbox tables or partitions based on type, -/// enabling sharding, tenant isolation, or workload separation in distributed systems. -/// -/// -/// -/// The property defines the logical partition name (e.g., "payments", "notifications", "eu_region") -/// and is typically used to route messages to specific database tables like outbox_{PartName}. -/// -/// -/// This interface uses static abstract members (C# 11+), so implementation must provide a concrete value at the type level. -/// It does not depend on instance data, making it efficient for generic processing without instantiating objects. -/// -/// -/// Example scenarios: -/// -/// Partitioning outbox by message type or domain. -/// Isolating tenants or regions in event queues. -/// Avoiding lock contention on a single outbox table. -/// -/// -/// -/// -public interface IOutboxHasPart -{ - /// - /// Gets the logical identifier of the partition associated with this type. - /// Used for routing messages to specific tables, shards, or storage groups (e.g., outbox_orders, outbox_eu). - /// - /// - /// A non-null string representing the partition name. Must be constant per type or derived from compile-time logic. - /// Should avoid special characters; use lowercase letters, digits, and underscores. - /// - /// "orders", "notifications", "us_west" - static abstract string PartName { get; } -} diff --git a/src/Sa.Outbox.Support/IOutboxPayloadMessage.cs b/src/Sa.Outbox.Support/IOutboxPayloadMessage.cs deleted file mode 100644 index a2515bd..0000000 --- a/src/Sa.Outbox.Support/IOutboxPayloadMessage.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace Sa.Outbox.Support; - - - -/// -/// Represents a message payload in the Outbox system. -/// This interface defines the properties that any Outbox payload message must implement. -/// -public interface IOutboxPayloadMessage : IOutboxHasPart -{ - /// - /// Gets the unique identifier for the payload. - /// - string PayloadId { get; } - - /// - /// Gets the identifier for the tenant associated with the payload. - /// - int TenantId { get; } -} diff --git a/src/Sa.Outbox.Support/PingMessage.cs b/src/Sa.Outbox.Support/PingMessage.cs deleted file mode 100644 index 24fcdda..0000000 --- a/src/Sa.Outbox.Support/PingMessage.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Sa.Outbox.Support; - - -public sealed record PingMessage(string PayloadId, int TenantId = 0) : IOutboxPayloadMessage -{ - public static string PartName => "root"; -} diff --git a/src/Sa.Outbox.Support/Sa.Outbox.Support.csproj b/src/Sa.Outbox.Support/Sa.Outbox.Support.csproj deleted file mode 100644 index 979cdfc..0000000 --- a/src/Sa.Outbox.Support/Sa.Outbox.Support.csproj +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - 0.6.0 - Intrfaces for publishing and using messages in the Outbox - - - diff --git a/src/Sa.Outbox/Delivery/ConsumeSettings.cs b/src/Sa.Outbox/Delivery/ConsumeSettings.cs new file mode 100644 index 0000000..07bc673 --- /dev/null +++ b/src/Sa.Outbox/Delivery/ConsumeSettings.cs @@ -0,0 +1,78 @@ +namespace Sa.Outbox.Delivery; + +/// +/// Represents the consumption settings for retrieving & processing messages from the Outbox +/// +/// +/// Initializes a new instance of the class. +/// +/// Group identity for consuming. If null or empty, uses default. +public sealed class ConsumeSettings +{ + /// + /// Maximum number of processing iterations when greedy mode is enabled. + /// -1 means unlimited iterations. + /// + public int MaxProcessingIterations { get; set; } = 10; + + /// + /// Delay between processing iterations when in greedy mode. + /// Helps prevent tight-looping when there are no messages. + /// + public TimeSpan IterationDelay { get; set; } = TimeSpan.Zero; + + /// + /// Gets or sets the maximum size of the Outbox message batch for each database poll. + /// Recommended values: 16, 32, 64, 128, 256, 512, 1024 ... + /// + public int MaxBatchSize { get; set; } = 16; + + /// + /// Message lock expiration time. + /// When a batch of messages for a bus instance is acquired, the messages will be locked (reserved) for that amount of time. + /// + public TimeSpan LockDuration { get; set; } = TimeSpan.FromSeconds(10); + + /// + /// How long before to request a lock renewal. + /// This should be much shorter than . + /// + public TimeSpan LockRenewal { get; set; } = TimeSpan.FromSeconds(3); + + /// + /// Select outbox messages for processing for the period + /// + public TimeSpan LookbackInterval { get; set; } = TimeSpan.FromDays(7); + + /// + /// The maximum number of delivery attempts before delivery will not be attempted again. + /// + public int MaxDeliveryAttempts { get; set; } = 3; + + /// + /// The maximum number of messages that can take in part + /// default value + /// + public int? ConsumeBatchSize { get; set; } + + /// + /// Time window to accumulate messages before processing a batch. + /// Delay to wait for "full set of messages" from input messages + /// + public TimeSpan BatchingWindow { get; set; } = TimeSpan.FromSeconds(3); + + /// + /// Maximum processing time allowed for each individual tenant. + /// If processing exceeds this timeout, it will be cancelled and tenant marked as failed. + /// + public TimeSpan PerTenantTimeout { get; set; } = TimeSpan.Zero; + + /// + /// Maximum number of tenants to process in parallel. + /// Values: + /// - 0 or 1: Sequential processing (default) + /// - > 1: Parallel processing with specified degree + /// - -1: Use Environment.ProcessorCount + /// + public int PerTenantMaxDegreeOfParallelism { get; set; } = 1; +} diff --git a/src/Sa.Outbox/Configuration/ConsumeSettings.cs b/src/Sa.Outbox/Delivery/ConsumeSettingsExtensions.cs similarity index 67% rename from src/Sa.Outbox/Configuration/ConsumeSettings.cs rename to src/Sa.Outbox/Delivery/ConsumeSettingsExtensions.cs index 3798216..56761d0 100644 --- a/src/Sa.Outbox/Configuration/ConsumeSettings.cs +++ b/src/Sa.Outbox/Delivery/ConsumeSettingsExtensions.cs @@ -1,82 +1,4 @@ -namespace Sa.Outbox; - -/// -/// Represents the consumption settings for retrieving & processing messages from the Outbox -/// -/// -/// Initializes a new instance of the class. -/// -/// Group identity for consuming. If null or empty, uses default. -public sealed class ConsumeSettings -{ - /// - /// Maximum number of processing iterations when greedy mode is enabled. - /// -1 means unlimited iterations. - /// - public int MaxProcessingIterations { get; set; } = 10; - - /// - /// Delay between processing iterations when in greedy mode. - /// Helps prevent tight-looping when there are no messages. - /// - public TimeSpan IterationDelay { get; set; } = TimeSpan.Zero; - - /// - /// Gets or sets the maximum size of the Outbox message batch for each database poll. - /// Recommended values: 16, 32, 64, 128, 256, 512, 1024 ... - /// - public int MaxBatchSize { get; set; } = 16; - - /// - /// Message lock expiration time. - /// When a batch of messages for a bus instance is acquired, the messages will be locked (reserved) for that amount of time. - /// - public TimeSpan LockDuration { get; set; } = TimeSpan.FromSeconds(10); - - /// - /// How long before to request a lock renewal. - /// This should be much shorter than . - /// - public TimeSpan LockRenewal { get; set; } = TimeSpan.FromSeconds(3); - - /// - /// Select outbox messages for processing for the period - /// - public TimeSpan LookbackInterval { get; set; } = TimeSpan.FromDays(7); - - /// - /// The maximum number of delivery attempts before delivery will not be attempted again. - /// - public int MaxDeliveryAttempts { get; set; } = 3; - - /// - /// The maximum number of messages that can take in part - /// default value - /// - public int? ConsumeBatchSize { get; set; } - - /// - /// Time window to accumulate messages before processing a batch. - /// Delay to wait for "full set of messages" from input messages - /// - public TimeSpan BatchingWindow { get; set; } = TimeSpan.FromSeconds(3); - - /// - /// Maximum processing time allowed for each individual tenant. - /// If processing exceeds this timeout, it will be cancelled and tenant marked as failed. - /// - public TimeSpan PerTenantTimeout { get; set; } = TimeSpan.Zero; - - /// - /// Maximum number of tenants to process in parallel. - /// Values: - /// - 0 or 1: Sequential processing (default) - /// - > 1: Parallel processing with specified degree - /// - -1: Use Environment.ProcessorCount - /// - public int PerTenantMaxDegreeOfParallelism { get; set; } = 1; -} - +namespace Sa.Outbox.Delivery; /// /// Extension methods for fluent configuration of . diff --git a/src/Sa.Outbox/Configuration/ConsumerGroupSettings.cs b/src/Sa.Outbox/Delivery/ConsumerGroupSettings.cs similarity index 95% rename from src/Sa.Outbox/Configuration/ConsumerGroupSettings.cs rename to src/Sa.Outbox/Delivery/ConsumerGroupSettings.cs index f6dd9dd..609c8c9 100644 --- a/src/Sa.Outbox/Configuration/ConsumerGroupSettings.cs +++ b/src/Sa.Outbox/Delivery/ConsumerGroupSettings.cs @@ -1,4 +1,4 @@ -namespace Sa.Outbox; +namespace Sa.Outbox.Delivery; /// diff --git a/src/Sa.Outbox/Delivery/DelivarySnapshot.cs b/src/Sa.Outbox/Delivery/DelivarySnapshot.cs index d8eaa4d..033dab8 100644 --- a/src/Sa.Outbox/Delivery/DelivarySnapshot.cs +++ b/src/Sa.Outbox/Delivery/DelivarySnapshot.cs @@ -1,10 +1,12 @@ -using Sa.Outbox.Job; -using Sa.Outbox.Publication; +using Sa.Outbox.Delivery.Job; +using Sa.Outbox.Metadata; using Sa.Schedule; namespace Sa.Outbox.Delivery; -internal sealed class DelivarySnapshot(IScheduleSettings scheduleSettings) : IDelivarySnapshot +internal sealed class DelivarySnapshot( + IScheduleSettings scheduleSettings, + IOutboxMessageMetadataProvider metadataProvider) : IDelivarySnapshot { private readonly Lazy _lazyJobs = new(() => [.. scheduleSettings.GetJobSettings()]); @@ -17,7 +19,7 @@ internal sealed class DelivarySnapshot(IScheduleSettings scheduleSettings) : IDe .Select(c => GetMessageTypeIfInheritsFromDeliveryJob(c.JobType, baseType)) .Where(mt => mt != null) .Cast() - .Select(mt => OutboxMessageTypeHelper.GetOutboxMessageTypeInfo(mt).PartName) + .Select(mt => metadataProvider.GetMetadata(mt).PartName) .Distinct()]; return parts; diff --git a/src/Sa.Outbox/Configuration/DeliveryBuilder.cs b/src/Sa.Outbox/Delivery/DeliveryBuilder.cs similarity index 86% rename from src/Sa.Outbox/Configuration/DeliveryBuilder.cs rename to src/Sa.Outbox/Delivery/DeliveryBuilder.cs index 0b59651..2a2a118 100644 --- a/src/Sa.Outbox/Configuration/DeliveryBuilder.cs +++ b/src/Sa.Outbox/Delivery/DeliveryBuilder.cs @@ -1,12 +1,10 @@ -using System.Diagnostics.CodeAnalysis; -using System.Text.RegularExpressions; -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; -using Sa.Outbox.Delivery; -using Sa.Outbox.Job; -using Sa.Outbox.Support; +using Sa.Outbox.Delivery.Job; +using System.Diagnostics.CodeAnalysis; +using System.Text.RegularExpressions; -namespace Sa.Outbox.Configuration; +namespace Sa.Outbox.Delivery; internal sealed partial class DeliveryBuilder(IServiceCollection services) : IDeliveryBuilder { @@ -15,7 +13,6 @@ internal sealed partial class DeliveryBuilder(IServiceCollection services) : IDe Action? configure = null ) where TConsumer : class, IConsumer - where TMessage : IOutboxPayloadMessage { ArgumentNullException.ThrowIfNullOrEmpty(consumerGroupId); services.AddDeliveryJob(SanitizeString(consumerGroupId), false, configure); @@ -27,7 +24,6 @@ internal sealed partial class DeliveryBuilder(IServiceCollection services) : IDe Action? configure = null ) where TConsumer : class, IConsumer - where TMessage : IOutboxPayloadMessage { ArgumentNullException.ThrowIfNullOrEmpty(consumerGroupId); services.AddDeliveryJob(SanitizeString(consumerGroupId), true, configure); diff --git a/src/Sa.Outbox/Delivery/DeliveryCourier.cs b/src/Sa.Outbox/Delivery/DeliveryCourier.cs index fce5860..f126071 100644 --- a/src/Sa.Outbox/Delivery/DeliveryCourier.cs +++ b/src/Sa.Outbox/Delivery/DeliveryCourier.cs @@ -1,6 +1,5 @@ using Sa.Extensions; using Sa.Outbox.Exceptions; -using Sa.Outbox.Support; using System.Runtime.CompilerServices; namespace Sa.Outbox.Delivery; @@ -18,7 +17,6 @@ public async ValueTask Deliver( OutboxMessageFilter filter, ReadOnlyMemory> messages, CancellationToken cancellationToken) - where TMessage : IOutboxPayloadMessage { if (messages.IsEmpty) return 0; diff --git a/src/Sa.Outbox/Delivery/DeliveryLifetimeInvoker.cs b/src/Sa.Outbox/Delivery/DeliveryLifetimeInvoker.cs index f6d7cf4..d73063c 100644 --- a/src/Sa.Outbox/Delivery/DeliveryLifetimeInvoker.cs +++ b/src/Sa.Outbox/Delivery/DeliveryLifetimeInvoker.cs @@ -1,5 +1,4 @@ using Microsoft.Extensions.DependencyInjection; -using Sa.Outbox.Support; using System.Collections.Concurrent; namespace Sa.Outbox.Delivery; @@ -17,7 +16,7 @@ public Task ConsumeInScope( ConsumerGroupSettings settings, OutboxMessageFilter filter, ReadOnlyMemory> messages, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage + CancellationToken cancellationToken) { return settings.AsSingleton ? ProcessInSingleton(settings, filter, messages, cancellationToken) @@ -28,7 +27,7 @@ private Task ProcessInSingleton( ConsumerGroupSettings settings, OutboxMessageFilter filter, ReadOnlyMemory> messages, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage + CancellationToken cancellationToken) { var consumer = GetOrCreateSingletonConsumer(settings); return ProcessMessages(consumer, settings, filter, messages, cancellationToken); @@ -38,7 +37,7 @@ private async Task ProcessInNewScope( ConsumerGroupSettings settings, OutboxMessageFilter filter, ReadOnlyMemory> messages, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage + CancellationToken cancellationToken) { using AsyncServiceScope scope = serviceProvider.CreateAsyncScope(); IConsumer consumer = scope.ServiceProvider.GetRequiredKeyedService>(settings); @@ -50,13 +49,13 @@ private static async Task ProcessMessages( ConsumerGroupSettings settings, OutboxMessageFilter filter, ReadOnlyMemory> messages, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage + CancellationToken cancellationToken) { await consumer.Consume(settings, filter, messages, cancellationToken); } private IConsumer GetOrCreateSingletonConsumer( - ConsumerGroupSettings settings) where TMessage : IOutboxPayloadMessage + ConsumerGroupSettings settings) { return (IConsumer)_singletonConsumers.GetOrAdd(settings, key => serviceProvider.GetRequiredKeyedService>(key)); diff --git a/src/Sa.Outbox/Delivery/DeliveryProcessor.cs b/src/Sa.Outbox/Delivery/DeliveryProcessor.cs index 1c76c9a..080a9b6 100644 --- a/src/Sa.Outbox/Delivery/DeliveryProcessor.cs +++ b/src/Sa.Outbox/Delivery/DeliveryProcessor.cs @@ -1,5 +1,4 @@ using Sa.Outbox.Partitional; -using Sa.Outbox.Support; namespace Sa.Outbox.Delivery; @@ -11,8 +10,9 @@ internal sealed class DeliveryProcessor( IDeliveryTenant processor, ITenantProvider tenantProvider) : IDeliveryProcessor { - public async Task ProcessMessages(ConsumerGroupSettings settings, CancellationToken cancellationToken) - where TMessage : IOutboxPayloadMessage + public async Task ProcessMessages( + ConsumerGroupSettings settings, + CancellationToken cancellationToken) { var consumeSettings = settings.ConsumeSettings; @@ -70,7 +70,7 @@ private static bool ShouldContinueProcessing( private async Task ProcessForEachTenant( int[] tenantIds, ConsumerGroupSettings settings, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage + CancellationToken cancellationToken) { if (settings.ConsumeSettings.PerTenantMaxDegreeOfParallelism == 1) { @@ -80,8 +80,10 @@ private async Task ProcessForEachTenant( return await ProcessTenantsParallel(tenantIds, settings, cancellationToken); } - private async Task ProcessTenantsSequential(int[] tenantIds, ConsumerGroupSettings settings, CancellationToken cancellationToken) - where TMessage : IOutboxPayloadMessage + private async Task ProcessTenantsSequential( + int[] tenantIds, + ConsumerGroupSettings settings, + CancellationToken cancellationToken) { int count = 0; foreach (int tenantId in tenantIds) @@ -96,7 +98,6 @@ public async Task ProcessTenantsParallel( int[] tenantIds, ConsumerGroupSettings settings, CancellationToken cancellationToken) - where TMessage : IOutboxPayloadMessage { var parallelOptions = new ParallelOptions { @@ -125,8 +126,10 @@ await Parallel.ForEachAsync( return totalCount; } - private async Task ProcessInTenant(int tenantId, ConsumerGroupSettings settings, CancellationToken cancellationToken) - where TMessage : IOutboxPayloadMessage + private async Task ProcessInTenant( + int tenantId, + ConsumerGroupSettings settings, + CancellationToken cancellationToken) { using var tenantCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); if (settings.ConsumeSettings.PerTenantTimeout > TimeSpan.Zero) diff --git a/src/Sa.Outbox/Delivery/DeliveryStatus.cs b/src/Sa.Outbox/Delivery/DeliveryStatus.cs new file mode 100644 index 0000000..a4d3ec1 --- /dev/null +++ b/src/Sa.Outbox/Delivery/DeliveryStatus.cs @@ -0,0 +1,13 @@ +namespace Sa.Outbox.Delivery; + +/// +/// Represents the status of a delivery attempt. +/// +/// The status code representing the result of the delivery. +/// A message providing additional context about the delivery status. +/// The date and time when the status was created. +public readonly record struct DeliveryStatus( + DeliveryStatusCode Code, + string Message, + DateTimeOffset CreatedAt +); diff --git a/src/Sa.Outbox/DeliveryStatusCode.cs b/src/Sa.Outbox/Delivery/DeliveryStatusCode.cs similarity index 99% rename from src/Sa.Outbox/DeliveryStatusCode.cs rename to src/Sa.Outbox/Delivery/DeliveryStatusCode.cs index d02e38d..d642c29 100644 --- a/src/Sa.Outbox/DeliveryStatusCode.cs +++ b/src/Sa.Outbox/Delivery/DeliveryStatusCode.cs @@ -1,4 +1,4 @@ -namespace Sa.Outbox; +namespace Sa.Outbox.Delivery; public enum DeliveryStatusCode diff --git a/src/Sa.Outbox/Delivery/DeliveryTenant.cs b/src/Sa.Outbox/Delivery/DeliveryTenant.cs index d5901e5..c9566b2 100644 --- a/src/Sa.Outbox/Delivery/DeliveryTenant.cs +++ b/src/Sa.Outbox/Delivery/DeliveryTenant.cs @@ -1,6 +1,5 @@ using Sa.Classes; using Sa.Outbox.PlugServices; -using Sa.Outbox.Support; using System.Buffers; @@ -13,12 +12,13 @@ internal sealed class DeliveryTenant( IOutboxDeliveryManager deliveryMan, TimeProvider timeProvider, IDeliveryCourier deliveryCourier, - IDeliveryBatcher batcher) : IDeliveryTenant + IDeliveryBatcher batcher, + FilterFactory filterFactory) : IDeliveryTenant { public async Task ProcessInTenant( int tenantId, ConsumerGroupSettings settings, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage + CancellationToken cancellationToken) { var filter = CreateFilter(tenantId, settings); @@ -46,18 +46,16 @@ public async Task ProcessInTenant( } private OutboxMessageFilter CreateFilter(int tenantId, ConsumerGroupSettings settings) - where TMessage : IOutboxPayloadMessage { - return FilterFactory.CreateFilter( - tenantId, - settings.ConsumerGroupId, - timeProvider.GetUtcNow(), - settings.ConsumeSettings.LookbackInterval, - settings.ConsumeSettings.BatchingWindow); + return filterFactory.CreateFilter( + tenantId: tenantId, + consumerGroupId: settings.ConsumerGroupId, + now: timeProvider.GetUtcNow(), + lookbackInterval: settings.ConsumeSettings.LookbackInterval, + batchingWindow: settings.ConsumeSettings.BatchingWindow); } private static IMemoryOwner> RentMemory(int size) - where TMessage : IOutboxPayloadMessage { return MemoryPool>.Shared.Rent(size); } @@ -79,7 +77,7 @@ private async Task>> AcquireMe ConsumeSettings consumeSettings, OutboxMessageFilter filter, Memory> buffer, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage + CancellationToken cancellationToken) { var lockedCount = await deliveryMan.RentDelivery( buffer, @@ -95,7 +93,6 @@ private Task ReleaseMessagesAsync( ReadOnlyMemory> messages, OutboxMessageFilter filter, CancellationToken cancellationToken) - where TMessage : IOutboxPayloadMessage { return deliveryMan.ReturnDelivery( messages, diff --git a/src/Sa.Outbox/Delivery/FilterFactory.cs b/src/Sa.Outbox/Delivery/FilterFactory.cs index 52ffc79..de7adc8 100644 --- a/src/Sa.Outbox/Delivery/FilterFactory.cs +++ b/src/Sa.Outbox/Delivery/FilterFactory.cs @@ -1,6 +1,5 @@ using Sa.Extensions; -using Sa.Outbox.Publication; -using Sa.Outbox.Support; +using Sa.Outbox.Metadata; namespace Sa.Outbox.Delivery; @@ -9,26 +8,24 @@ namespace Sa.Outbox.Delivery; /// /// Creates filters for outbox message queries. /// -internal static class FilterFactory +internal sealed class FilterFactory(IOutboxMessageMetadataProvider metadata) { - public static OutboxMessageFilter CreateFilter( + public OutboxMessageFilter CreateFilter( int tenantId, string consumerGroupId, DateTimeOffset now, TimeSpan lookbackInterval, - TimeSpan batchingWindow) where TMessage : IOutboxPayloadMessage + TimeSpan batchingWindow) { - OutboxMessageTypeInfo ti = OutboxMessageTypeHelper.GetOutboxMessageTypeInfo(); - return new OutboxMessageFilter( TransactId: GenerateTransactId(), ConsumerGroupId: consumerGroupId, PayloadType: typeof(TMessage).Name, - tenantId, - ti.PartName, - now.StartOfDay() - lookbackInterval, - now - batchingWindow, - now + TenantId: tenantId, + Part: metadata.GetMetadata().PartName, + FromDate: now.StartOfDay() - lookbackInterval, + ToDate: now - batchingWindow, + NowDate: now ); } diff --git a/src/Sa.Outbox/Configuration/IConsumerGroupNamingStrategy.cs b/src/Sa.Outbox/Delivery/IConsumerGroupNamingStrategy.cs similarity index 74% rename from src/Sa.Outbox/Configuration/IConsumerGroupNamingStrategy.cs rename to src/Sa.Outbox/Delivery/IConsumerGroupNamingStrategy.cs index 0bc018d..e1c1179 100644 --- a/src/Sa.Outbox/Configuration/IConsumerGroupNamingStrategy.cs +++ b/src/Sa.Outbox/Delivery/IConsumerGroupNamingStrategy.cs @@ -1,4 +1,4 @@ -namespace Sa.Outbox; +namespace Sa.Outbox.Delivery; public interface IConsumerGroupNamingStrategy { diff --git a/src/Sa.Outbox/Delivery/IDeliveryBatcher.cs b/src/Sa.Outbox/Delivery/IDeliveryBatcher.cs index c6f5379..7b3100e 100644 --- a/src/Sa.Outbox/Delivery/IDeliveryBatcher.cs +++ b/src/Sa.Outbox/Delivery/IDeliveryBatcher.cs @@ -8,5 +8,8 @@ public interface IDeliveryBatcher /// /// Calculates the optimal batch size given a maximum allowed size and a specific tenant ID. /// - ValueTask CalculateBatchSize(int maxBatchSize, OutboxMessageFilter filter, CancellationToken cancellationToken); + ValueTask CalculateBatchSize( + int maxBatchSize, + OutboxMessageFilter filter, + CancellationToken cancellationToken); } diff --git a/src/Sa.Outbox/Configuration/IDeliveryBuilder.cs b/src/Sa.Outbox/Delivery/IDeliveryBuilder.cs similarity index 86% rename from src/Sa.Outbox/Configuration/IDeliveryBuilder.cs rename to src/Sa.Outbox/Delivery/IDeliveryBuilder.cs index 8852134..baa4cf2 100644 --- a/src/Sa.Outbox/Configuration/IDeliveryBuilder.cs +++ b/src/Sa.Outbox/Delivery/IDeliveryBuilder.cs @@ -1,8 +1,6 @@ using System.Diagnostics.CodeAnalysis; -using Sa.Outbox.Delivery; -using Sa.Outbox.Support; -namespace Sa.Outbox; +namespace Sa.Outbox.Delivery; /// @@ -22,8 +20,7 @@ public partial interface IDeliveryBuilder string consumerGroupId, Action? configure = null ) - where TConsumer : class, IConsumer - where TMessage : IOutboxPayloadMessage; + where TConsumer : class, IConsumer; /// /// Adds singleton delivery for the specified consumer and message type. @@ -32,8 +29,7 @@ public partial interface IDeliveryBuilder string consumerGroupId, Action? configure = null ) - where TConsumer : class, IConsumer - where TMessage : IOutboxPayloadMessage; + where TConsumer : class, IConsumer; /// diff --git a/src/Sa.Outbox/Configuration/IDeliveryBuilder.partial.cs b/src/Sa.Outbox/Delivery/IDeliveryBuilder.partial.cs similarity index 88% rename from src/Sa.Outbox/Configuration/IDeliveryBuilder.partial.cs rename to src/Sa.Outbox/Delivery/IDeliveryBuilder.partial.cs index 0a39a1a..83fa33d 100644 --- a/src/Sa.Outbox/Configuration/IDeliveryBuilder.partial.cs +++ b/src/Sa.Outbox/Delivery/IDeliveryBuilder.partial.cs @@ -1,8 +1,7 @@ -using System.Diagnostics.CodeAnalysis; -using Sa.Extensions; -using Sa.Outbox.Support; +using Sa.Extensions; +using System.Diagnostics.CodeAnalysis; -namespace Sa.Outbox; +namespace Sa.Outbox.Delivery; public partial interface IDeliveryBuilder { @@ -12,14 +11,12 @@ public partial interface IDeliveryBuilder Action? configure = null ) where TConsumer : class, IConsumer - where TMessage : IOutboxPayloadMessage => AddDeliveryScoped(GetConsumerGroupName(), configure); public IDeliveryBuilder AddDelivery<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConsumer, TMessage>( Action? configure = null ) where TConsumer : class, IConsumer - where TMessage : IOutboxPayloadMessage => AddDelivery(GetConsumerGroupName(), configure); diff --git a/src/Sa.Outbox/Delivery/IDeliveryCourier.cs b/src/Sa.Outbox/Delivery/IDeliveryCourier.cs index 7501966..67acba9 100644 --- a/src/Sa.Outbox/Delivery/IDeliveryCourier.cs +++ b/src/Sa.Outbox/Delivery/IDeliveryCourier.cs @@ -1,6 +1,4 @@ -using Sa.Outbox.Support; - -namespace Sa.Outbox.Delivery; +namespace Sa.Outbox.Delivery; /// /// Delivers a batch of messages with error handling and retry mechanisms @@ -11,5 +9,5 @@ ValueTask Deliver( ConsumerGroupSettings settings, OutboxMessageFilter filter, ReadOnlyMemory> messages, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage; + CancellationToken cancellationToken); } diff --git a/src/Sa.Outbox/Delivery/IDeliveryLifetimeInvoker.cs b/src/Sa.Outbox/Delivery/IDeliveryLifetimeInvoker.cs index b94bce0..85e7512 100644 --- a/src/Sa.Outbox/Delivery/IDeliveryLifetimeInvoker.cs +++ b/src/Sa.Outbox/Delivery/IDeliveryLifetimeInvoker.cs @@ -1,6 +1,4 @@ -using Sa.Outbox.Support; - -namespace Sa.Outbox.Delivery; +namespace Sa.Outbox.Delivery; /// /// Processes messages using a consumer in scope @@ -11,5 +9,5 @@ Task ConsumeInScope( ConsumerGroupSettings settings, OutboxMessageFilter filter, ReadOnlyMemory> messages, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage; + CancellationToken cancellationToken); } diff --git a/src/Sa.Outbox/Delivery/IDeliveryProcessor.cs b/src/Sa.Outbox/Delivery/IDeliveryProcessor.cs index 69f0990..2c1b0b8 100644 --- a/src/Sa.Outbox/Delivery/IDeliveryProcessor.cs +++ b/src/Sa.Outbox/Delivery/IDeliveryProcessor.cs @@ -1,6 +1,4 @@ -using Sa.Outbox.Support; - -namespace Sa.Outbox.Delivery; +namespace Sa.Outbox.Delivery; /// /// Processes outbox messages in batches until all pending messages are delivered or cancellation is requested. @@ -8,6 +6,5 @@ namespace Sa.Outbox.Delivery; /// public interface IDeliveryProcessor { - Task ProcessMessages(ConsumerGroupSettings settings, CancellationToken cancellationToken) - where TMessage : IOutboxPayloadMessage; + Task ProcessMessages(ConsumerGroupSettings settings, CancellationToken cancellationToken); } diff --git a/src/Sa.Outbox/Delivery/IDeliveryTenant.cs b/src/Sa.Outbox/Delivery/IDeliveryTenant.cs index c567f55..01fc943 100644 --- a/src/Sa.Outbox/Delivery/IDeliveryTenant.cs +++ b/src/Sa.Outbox/Delivery/IDeliveryTenant.cs @@ -1,6 +1,4 @@ -using Sa.Outbox.Support; - -namespace Sa.Outbox.Delivery; +namespace Sa.Outbox.Delivery; /// /// Processes messages for a specific tenant with locking and delivery. @@ -10,5 +8,5 @@ internal interface IDeliveryTenant Task ProcessInTenant( int tenantId, ConsumerGroupSettings settings, - CancellationToken cancellationToken) where TMessage : IOutboxPayloadMessage; + CancellationToken cancellationToken); } diff --git a/src/Sa.Outbox/Job/DeliveryJob.cs b/src/Sa.Outbox/Delivery/Job/DeliveryJob.cs similarity index 62% rename from src/Sa.Outbox/Job/DeliveryJob.cs rename to src/Sa.Outbox/Delivery/Job/DeliveryJob.cs index c779e1f..76397ca 100644 --- a/src/Sa.Outbox/Job/DeliveryJob.cs +++ b/src/Sa.Outbox/Delivery/Job/DeliveryJob.cs @@ -1,23 +1,13 @@ -using Sa.Outbox.Delivery; -using Sa.Outbox.Publication; -using Sa.Outbox.Support; -using Sa.Schedule; +using Sa.Schedule; -namespace Sa.Outbox.Job; +namespace Sa.Outbox.Delivery.Job; public interface IDeliveryJob : IJob; internal sealed class DeliveryJob(IDeliveryProcessor processor) : IDeliveryJob - where TMessage : IOutboxPayloadMessage { - - static DeliveryJob() - { - OutboxMessageTypeHelper.GetOutboxMessageTypeInfo(); - } - public async Task Execute(IJobContext context, CancellationToken cancellationToken) { ConsumerGroupSettings settings = context.Settings.Properties.Tag as ConsumerGroupSettings diff --git a/src/Sa.Outbox/Job/IOutboxJobInterceptor.cs b/src/Sa.Outbox/Delivery/Job/IOutboxJobInterceptor.cs similarity index 71% rename from src/Sa.Outbox/Job/IOutboxJobInterceptor.cs rename to src/Sa.Outbox/Delivery/Job/IOutboxJobInterceptor.cs index 75e6c59..aecd5e8 100644 --- a/src/Sa.Outbox/Job/IOutboxJobInterceptor.cs +++ b/src/Sa.Outbox/Delivery/Job/IOutboxJobInterceptor.cs @@ -1,6 +1,6 @@ using Sa.Schedule; -namespace Sa.Outbox.Job; +namespace Sa.Outbox.Delivery.Job; public interface IOutboxJobInterceptor : IJobInterceptor { diff --git a/src/Sa.Outbox/Job/OutboxJobInterceptor.cs b/src/Sa.Outbox/Delivery/Job/OutboxJobInterceptor.cs similarity index 86% rename from src/Sa.Outbox/Job/OutboxJobInterceptor.cs rename to src/Sa.Outbox/Delivery/Job/OutboxJobInterceptor.cs index bbb5a39..da08f4f 100644 --- a/src/Sa.Outbox/Job/OutboxJobInterceptor.cs +++ b/src/Sa.Outbox/Delivery/Job/OutboxJobInterceptor.cs @@ -1,8 +1,9 @@ using Sa.Schedule; -namespace Sa.Outbox.Job; +namespace Sa.Outbox.Delivery.Job; -internal sealed class OutboxJobInterceptor(IEnumerable interceptors) : IJobInterceptor +internal sealed class OutboxJobInterceptor(IEnumerable interceptors) + : IJobInterceptor { public async Task OnHandle(IJobContext context, Func next, object? key, CancellationToken cancellationToken) { diff --git a/src/Sa.Outbox/Job/Setup.cs b/src/Sa.Outbox/Delivery/Job/Setup.cs similarity index 82% rename from src/Sa.Outbox/Job/Setup.cs rename to src/Sa.Outbox/Delivery/Job/Setup.cs index 2d7033d..c9a89f3 100644 --- a/src/Sa.Outbox/Job/Setup.cs +++ b/src/Sa.Outbox/Delivery/Job/Setup.cs @@ -1,19 +1,18 @@ -using System.Diagnostics.CodeAnalysis; -using Microsoft.Extensions.DependencyInjection; -using Sa.Outbox.Support; +using Microsoft.Extensions.DependencyInjection; using Sa.Schedule; +using System.Diagnostics.CodeAnalysis; -namespace Sa.Outbox.Job; +namespace Sa.Outbox.Delivery.Job; internal static class Setup { - public static IServiceCollection AddDeliveryJob<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConsumer, TMessage>( + public static IServiceCollection AddDeliveryJob< + [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConsumer, TMessage>( this IServiceCollection services, string consumerGroupId, bool isSingleton, Action? сonfigure = null) where TConsumer : class, IConsumer - where TMessage : IOutboxPayloadMessage { ArgumentNullException.ThrowIfNullOrWhiteSpace(consumerGroupId); diff --git a/src/Sa.Outbox/Delivery/OutboxContext.cs b/src/Sa.Outbox/Delivery/OutboxContext.cs index 1328b3d..a7948ce 100644 --- a/src/Sa.Outbox/Delivery/OutboxContext.cs +++ b/src/Sa.Outbox/Delivery/OutboxContext.cs @@ -1,5 +1,5 @@ -using System.Diagnostics; -using Sa.Outbox.Exceptions; +using Sa.Outbox.Exceptions; +using System.Diagnostics; namespace Sa.Outbox.Delivery; diff --git a/src/Sa.Outbox/Delivery/OutboxDeliveryMessage.cs b/src/Sa.Outbox/Delivery/OutboxDeliveryMessage.cs new file mode 100644 index 0000000..9a9b82e --- /dev/null +++ b/src/Sa.Outbox/Delivery/OutboxDeliveryMessage.cs @@ -0,0 +1,14 @@ +namespace Sa.Outbox.Delivery; + +/// +/// Represents a delivery message in the Outbox with its associated payload, part information, and delivery details. +/// +/// Gets the unique identifier for the Outbox delivery. +/// Message in the Outbox. +/// Gets information about the delivery of the Outbox message. +/// The type of the message payload. +public sealed record OutboxDeliveryMessage( + Guid OutboxId, + OutboxMessage Message, + OutboxTaskDeliveryInfo DeliveryInfo +); diff --git a/src/Sa.Outbox/Delivery/OutboxTaskDeliveryInfo.cs b/src/Sa.Outbox/Delivery/OutboxTaskDeliveryInfo.cs new file mode 100644 index 0000000..2ebff26 --- /dev/null +++ b/src/Sa.Outbox/Delivery/OutboxTaskDeliveryInfo.cs @@ -0,0 +1,24 @@ +namespace Sa.Outbox.Delivery; + +/// +/// Represents information about the delivery of an Outbox message. +/// +/// The unique identifier of the processing task associated with the Outbox message. +/// Each message processing creates a separate task that can be retried in case of delivery failures. +/// The identifier of the latest delivery attempt. +/// Multiple delivery attempts can be made for the same task, each with its own DeliveryId. +/// 0 if no delivery attempts have been made yet. +/// The number of delivery attempts made for this task. +/// Incremented with each retry attempt. +/// The identifier of the last error encountered during delivery. +/// Used for error tracking and monitoring. +/// The current status of the delivery. +/// Information about the partition/segment of the Outbox. +public sealed record OutboxTaskDeliveryInfo( + long TaskId, + long DeliveryId, + int Attempt, + long LastErrorId, + DeliveryStatus Status, + OutboxPartInfo PartInfo +); diff --git a/src/Sa.Outbox/Delivery/ScheduleSettings.cs b/src/Sa.Outbox/Delivery/ScheduleSettings.cs new file mode 100644 index 0000000..9a70492 --- /dev/null +++ b/src/Sa.Outbox/Delivery/ScheduleSettings.cs @@ -0,0 +1,23 @@ +namespace Sa.Outbox.Delivery; + +/// +/// Represents the scheduling settings for the delivery job. +/// +public sealed class ScheduleSettings +{ + /// + /// Gets the unique identifier for the delivery job + /// + public Guid JobId { get; } = Guid.NewGuid(); + + public string? Name { get; internal set; } + + public TimeSpan Interval { get; internal set; } = TimeSpan.FromMinutes(1); + + /// + /// Job schedule delay before start + /// + public TimeSpan InitialDelay { get; internal set; } = TimeSpan.FromSeconds(10); + + public int RetryCountOnError { get; internal set; } = 2; +} diff --git a/src/Sa.Outbox/Configuration/ScheduleSettings.cs b/src/Sa.Outbox/Delivery/ScheduleSettingsExtensions.cs similarity index 83% rename from src/Sa.Outbox/Configuration/ScheduleSettings.cs rename to src/Sa.Outbox/Delivery/ScheduleSettingsExtensions.cs index 148e36a..50353fe 100644 --- a/src/Sa.Outbox/Configuration/ScheduleSettings.cs +++ b/src/Sa.Outbox/Delivery/ScheduleSettingsExtensions.cs @@ -1,27 +1,4 @@ -namespace Sa.Outbox; - -/// -/// Represents the scheduling settings for the delivery job. -/// -public sealed class ScheduleSettings -{ - /// - /// Gets the unique identifier for the delivery job - /// - public Guid JobId { get; } = Guid.NewGuid(); - - public string? Name { get; internal set; } - - public TimeSpan Interval { get; internal set; } = TimeSpan.FromMinutes(1); - - /// - /// Job schedule delay before start - /// - public TimeSpan InitialDelay { get; internal set; } = TimeSpan.FromSeconds(10); - - public int RetryCountOnError { get; internal set; } = 2; -} - +namespace Sa.Outbox.Delivery; public static class ScheduleSettingsExtensions { diff --git a/src/Sa.Outbox/Delivery/Setup.cs b/src/Sa.Outbox/Delivery/Setup.cs index 737045c..40f5744 100644 --- a/src/Sa.Outbox/Delivery/Setup.cs +++ b/src/Sa.Outbox/Delivery/Setup.cs @@ -1,6 +1,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; -using Sa.Outbox.Configuration; +using Sa.Outbox.Metadata; using Sa.Outbox.Partitional; namespace Sa.Outbox.Delivery; @@ -9,6 +9,9 @@ internal static class Setup { public static IServiceCollection AddOutboxDelivery(this IServiceCollection services, Action configure) { + services.AddMessagesMetadata(); + + services.TryAddSingleton(); services.TryAddSingleton(); diff --git a/src/Sa.Outbox/Exceptions/DeliveryException.cs b/src/Sa.Outbox/Exceptions/DeliveryException.cs index d9e5105..d26ca8a 100644 --- a/src/Sa.Outbox/Exceptions/DeliveryException.cs +++ b/src/Sa.Outbox/Exceptions/DeliveryException.cs @@ -1,4 +1,6 @@ -namespace Sa.Outbox.Exceptions; +using Sa.Outbox.Delivery; + +namespace Sa.Outbox.Exceptions; public class DeliveryException( string message, diff --git a/src/Sa.Outbox/Exceptions/DeliveryPermanentException.cs b/src/Sa.Outbox/Exceptions/DeliveryPermanentException.cs index c5dbcb1..8278ec6 100644 --- a/src/Sa.Outbox/Exceptions/DeliveryPermanentException.cs +++ b/src/Sa.Outbox/Exceptions/DeliveryPermanentException.cs @@ -1,4 +1,6 @@ -namespace Sa.Outbox.Exceptions; +using Sa.Outbox.Delivery; + +namespace Sa.Outbox.Exceptions; public class DeliveryPermanentException(string message, Exception? innerException = null, DeliveryStatusCode statusCode = DeliveryStatusCode.Error) : DeliveryException(message, innerException, statusCode) diff --git a/src/Sa.Outbox/IConsumer.cs b/src/Sa.Outbox/IConsumer.cs index dea76b1..3f43c90 100644 --- a/src/Sa.Outbox/IConsumer.cs +++ b/src/Sa.Outbox/IConsumer.cs @@ -1,4 +1,4 @@ -using Sa.Outbox.Support; +using Sa.Outbox.Delivery; namespace Sa.Outbox; @@ -7,7 +7,6 @@ namespace Sa.Outbox; /// /// The type of the message being consumed. public interface IConsumer : IConsumer - where TMessage : IOutboxPayloadMessage { /// /// Consumes a collection of Outbox messages. diff --git a/src/Sa.Outbox/Configuration/IOutboxBuilder.cs b/src/Sa.Outbox/IOutboxBuilder.cs similarity index 69% rename from src/Sa.Outbox/Configuration/IOutboxBuilder.cs rename to src/Sa.Outbox/IOutboxBuilder.cs index 51c22b0..9f42671 100644 --- a/src/Sa.Outbox/Configuration/IOutboxBuilder.cs +++ b/src/Sa.Outbox/IOutboxBuilder.cs @@ -1,14 +1,16 @@ -using System.Diagnostics.CodeAnalysis; -using Sa.Outbox.Delivery; +using Sa.Outbox.Delivery; +using Sa.Outbox.Metadata; +using Sa.Outbox.Publication; +using System.Diagnostics.CodeAnalysis; namespace Sa.Outbox; public interface IOutboxBuilder { /// - /// Gets the current publish settings for the outbox configuration. + /// Configure publish settings for the outbox. /// - OutboxPublishSettings PublishSettings { get; } + IOutboxBuilder WithPublishSettings(Action configure); /// /// Configures the delivery settings for the outbox. @@ -22,11 +24,14 @@ public interface IOutboxBuilder /// /// An action to configure the partitioning settings. /// The current instance of the IOutboxSettingsBuilder. - IOutboxBuilder WithTenantSettings(Action configure); + IOutboxBuilder WithTenants(Action configure); /// /// Registers a custom implementation of IDeliveryBatcher to control how messages are batched for delivery. /// IOutboxBuilder WithDeliveryBatcher<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TImplementation>() where TImplementation : class, IDeliveryBatcher; + + + IOutboxBuilder WithMetadata(Action configure); } diff --git a/src/Sa.Outbox/IOutboxContext.cs b/src/Sa.Outbox/IOutboxContext.cs index 0cd49c7..a8ed467 100644 --- a/src/Sa.Outbox/IOutboxContext.cs +++ b/src/Sa.Outbox/IOutboxContext.cs @@ -1,4 +1,6 @@ -namespace Sa.Outbox; +using Sa.Outbox.Delivery; + +namespace Sa.Outbox; /// diff --git a/src/Sa.Outbox/Metadata/IOutboxMessageMetadataBuilder.cs b/src/Sa.Outbox/Metadata/IOutboxMessageMetadataBuilder.cs new file mode 100644 index 0000000..33ee9c7 --- /dev/null +++ b/src/Sa.Outbox/Metadata/IOutboxMessageMetadataBuilder.cs @@ -0,0 +1,7 @@ +namespace Sa.Outbox.Metadata; + +public interface IOutboxMessageMetadataBuilder +{ + IOutboxMessageMetadataBuilder AddMetadata(string partName, Func? getPayloadId = null) + where TMessage : class; +} diff --git a/src/Sa.Outbox/Metadata/IOutboxMessageMetadataProvider.cs b/src/Sa.Outbox/Metadata/IOutboxMessageMetadataProvider.cs new file mode 100644 index 0000000..5eb11bc --- /dev/null +++ b/src/Sa.Outbox/Metadata/IOutboxMessageMetadataProvider.cs @@ -0,0 +1,7 @@ +namespace Sa.Outbox.Metadata; + +internal interface IOutboxMessageMetadataProvider +{ + OutboxMessageMetadata GetMetadata(Type messageType); + OutboxMessageMetadata GetMetadata() => GetMetadata(typeof(TMessage)); +} diff --git a/src/Sa.Outbox/Metadata/MetadataConfiguration.cs b/src/Sa.Outbox/Metadata/MetadataConfiguration.cs new file mode 100644 index 0000000..da1186e --- /dev/null +++ b/src/Sa.Outbox/Metadata/MetadataConfiguration.cs @@ -0,0 +1,54 @@ +namespace Sa.Outbox.Metadata; + +internal sealed class MetadataConfiguration : IOutboxMessageMetadataBuilder, IOutboxMessageMetadataProvider +{ + private readonly Dictionary _metadata = []; + + private static readonly Func s_Dummy = _ => string.Empty; + + private static readonly OutboxMessageMetadata s_Default = new("root", s_Dummy); + + + public IOutboxMessageMetadataBuilder AddMetadata(string partName, Func? getPayloadId = null) + where T : class + { + if (string.IsNullOrWhiteSpace(partName)) + { + throw new ArgumentException( + "Part name cannot be null or whitespace", + nameof(partName)); + } + + var messageType = typeof(T); + + Func payloadIdGetter = getPayloadId != null + ? obj => getPayloadId((T)obj) + : s_Dummy; + + _metadata[messageType] = new OutboxMessageMetadata( + PartName: partName, + GetPayloadId: payloadIdGetter); + + return this; + } + + + public OutboxMessageMetadata GetMetadata(Type messageType) + { + if (_metadata.TryGetValue(messageType, out var metadata)) + { + return metadata; + } + + return s_Default; + } + + + internal void MergeFrom(MetadataConfiguration other) + { + foreach (var cmeta in other._metadata) + { + _metadata[cmeta.Key] = cmeta.Value; + } + } +} diff --git a/src/Sa.Outbox/Metadata/OutboxMessageMetadata.cs b/src/Sa.Outbox/Metadata/OutboxMessageMetadata.cs new file mode 100644 index 0000000..855193e --- /dev/null +++ b/src/Sa.Outbox/Metadata/OutboxMessageMetadata.cs @@ -0,0 +1,6 @@ +namespace Sa.Outbox.Metadata; + + +internal sealed record OutboxMessageMetadata( + string PartName, + Func GetPayloadId); diff --git a/src/Sa.Outbox/Metadata/Setup.cs b/src/Sa.Outbox/Metadata/Setup.cs new file mode 100644 index 0000000..090be7c --- /dev/null +++ b/src/Sa.Outbox/Metadata/Setup.cs @@ -0,0 +1,35 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Sa.Outbox.Metadata; + +internal static class Setup +{ + public static IServiceCollection AddMessagesMetadata( + this IServiceCollection services, + Action? configure = null) + { + + services.AddSingleton(sp => + { + var configuration = new MetadataConfiguration(); + configure?.Invoke(sp, configuration); + return configuration; + }); + + + services.TryAddSingleton(sp => + { + var configuration = new MetadataConfiguration(); + + foreach (var config in sp.GetServices()) + { + configuration.MergeFrom(config); + } + + return configuration; + }); + + return services; + } +} diff --git a/src/Sa.Outbox/Configuration/OutboxBuilder.cs b/src/Sa.Outbox/OutboxBuilder.cs similarity index 54% rename from src/Sa.Outbox/Configuration/OutboxBuilder.cs rename to src/Sa.Outbox/OutboxBuilder.cs index c27a052..92f1207 100644 --- a/src/Sa.Outbox/Configuration/OutboxBuilder.cs +++ b/src/Sa.Outbox/OutboxBuilder.cs @@ -1,24 +1,39 @@ -using System.Diagnostics.CodeAnalysis; -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Sa.Outbox.Delivery; +using Sa.Outbox.Metadata; using Sa.Outbox.Partitional; using Sa.Outbox.Publication; +using System.Diagnostics.CodeAnalysis; -namespace Sa.Outbox.Configuration; +namespace Sa.Outbox; internal sealed class OutboxBuilder : IOutboxBuilder { private readonly IServiceCollection _services; - public OutboxBuilder(IServiceCollection services) + private OutboxBuilder(IServiceCollection services) { _services = services; + } + + public static OutboxBuilder Create(IServiceCollection services) + { services.AddMessagePublisher(); - services.TryAddSingleton(this.PublishSettings); + return new OutboxBuilder(services); + } + + public IOutboxBuilder WithPublishSettings(Action configure) + { + _services.AddMessagePublisher(configure); + return this; } - public OutboxPublishSettings PublishSettings { get; } = new(); + public IOutboxBuilder WithMetadata(Action configure) + { + _services.AddMessagesMetadata(configure); + return this; + } public IOutboxBuilder WithDeliveries(Action build) { @@ -26,7 +41,7 @@ public IOutboxBuilder WithDeliveries(Action build) return this; } - public IOutboxBuilder WithTenantSettings(Action configure) + public IOutboxBuilder WithTenants(Action configure) { _services.AddTenantProvider(configure); return this; diff --git a/src/Sa.Outbox/OutboxMessage.cs b/src/Sa.Outbox/OutboxMessage.cs index f755f0e..b8a4cef 100644 --- a/src/Sa.Outbox/OutboxMessage.cs +++ b/src/Sa.Outbox/OutboxMessage.cs @@ -14,19 +14,6 @@ public sealed record OutboxMessage( OutboxPartInfo PartInfo ); -/// -/// Represents a delivery message in the Outbox with its associated payload, part information, and delivery details. -/// -/// Gets the unique identifier for the Outbox delivery. -/// Message in the Outbox. -/// Gets information about the delivery of the Outbox message. -/// The type of the message payload. -public sealed record OutboxDeliveryMessage( - Guid OutboxId, - OutboxMessage Message, - OutboxTaskDeliveryInfo DeliveryInfo -); - /// /// Represents information about a part of the Outbox message. /// @@ -38,38 +25,3 @@ public sealed record OutboxPartInfo( string Part, DateTimeOffset CreatedAt ); - -/// -/// Represents information about the delivery of an Outbox message. -/// -/// The unique identifier of the processing task associated with the Outbox message. -/// Each message processing creates a separate task that can be retried in case of delivery failures. -/// The identifier of the latest delivery attempt. -/// Multiple delivery attempts can be made for the same task, each with its own DeliveryId. -/// 0 if no delivery attempts have been made yet. -/// The number of delivery attempts made for this task. -/// Incremented with each retry attempt. -/// The identifier of the last error encountered during delivery. -/// Used for error tracking and monitoring. -/// The current status of the delivery. -/// Information about the partition/segment of the Outbox. -public sealed record OutboxTaskDeliveryInfo( - long TaskId, - long DeliveryId, - int Attempt, - long LastErrorId, - DeliveryStatus Status, - OutboxPartInfo PartInfo -); - -/// -/// Represents the status of a delivery attempt. -/// -/// The status code representing the result of the delivery. -/// A message providing additional context about the delivery status. -/// The date and time when the status was created. -public readonly record struct DeliveryStatus( - DeliveryStatusCode Code, - string Message, - DateTimeOffset CreatedAt -); diff --git a/src/Sa.Outbox/Partitional/TenantSettings.cs b/src/Sa.Outbox/Partitional/TenantSettings.cs index 1add4cd..a07d99e 100644 --- a/src/Sa.Outbox/Partitional/TenantSettings.cs +++ b/src/Sa.Outbox/Partitional/TenantSettings.cs @@ -10,7 +10,7 @@ public sealed class TenantSettings /// Gets or sets a value indicating whether the system should automatically detect tenants /// by scanning incoming database messages /// - public bool AutoDetect { get; set; } = false; + public bool AutoDetect { get; private set; } = false; /// /// Gets or sets a function that retrieves tenant IDs asynchronously. diff --git a/src/Sa.Outbox/Publication/IOutboxMessagePublisher.cs b/src/Sa.Outbox/Publication/IOutboxMessagePublisher.cs index 8104b84..afdf874 100644 --- a/src/Sa.Outbox/Publication/IOutboxMessagePublisher.cs +++ b/src/Sa.Outbox/Publication/IOutboxMessagePublisher.cs @@ -1,6 +1,4 @@ -using Sa.Outbox.Support; - -namespace Sa.Outbox.Publication; +namespace Sa.Outbox.Publication; /// /// Defines a contract for publishing outbox messages. @@ -15,8 +13,8 @@ public interface IOutboxMessagePublisher /// A token to monitor for cancellation requests. /// A representing the asynchronous operation, /// with the number of successfully published messages as the result. - ValueTask Publish(IReadOnlyCollection messages, CancellationToken cancellationToken = default) - where TMessage : IOutboxPayloadMessage; + ValueTask Publish( + IReadOnlyCollection messages, int tenantId = 0, CancellationToken cancellationToken = default); /// /// Publishes a single message. @@ -26,6 +24,27 @@ ValueTask Publish(IReadOnlyCollection messages, Cance /// A token to monitor for cancellation requests. /// A representing the asynchronous operation, /// with the number of successfully published messages as the result. - ValueTask Publish(TMessage messages, CancellationToken cancellationToken = default) - where TMessage : IOutboxPayloadMessage => Publish([messages], cancellationToken); + ValueTask Publish(TMessage messages, int tenantId = 0, CancellationToken cancellationToken = default) + => Publish([messages], tenantId, cancellationToken); + + + async ValueTask Publish( + IReadOnlyCollection messages, + Func getTenantId, + CancellationToken cancellationToken = default) + { + + var lookup = messages.ToLookup(getTenantId); + ulong totals = 0; + + foreach (var tenantId in lookup.Select(g => g.Key)) + { + var group = lookup[tenantId]; + var tenantMessages = group.ToArray(); + + totals += await Publish(tenantMessages, tenantId, cancellationToken); + } + + return totals; + } } diff --git a/src/Sa.Outbox/Publication/OutboxMessagePublisher.cs b/src/Sa.Outbox/Publication/OutboxMessagePublisher.cs index 1f0a33e..007d9e1 100644 --- a/src/Sa.Outbox/Publication/OutboxMessagePublisher.cs +++ b/src/Sa.Outbox/Publication/OutboxMessagePublisher.cs @@ -1,26 +1,31 @@ using Sa.Classes; +using Sa.Outbox.Metadata; using Sa.Outbox.PlugServices; -using Sa.Outbox.Support; namespace Sa.Outbox.Publication; internal sealed class OutboxMessagePublisher( TimeProvider timeProvider, IOutboxBulkWriter bulkWriter, - OutboxPublishSettings publishSettings + OutboxPublishSettings publishSettings, + IOutboxMessageMetadataProvider metadataProvider ) : IOutboxMessagePublisher { - public async ValueTask Publish(IReadOnlyCollection messages, CancellationToken cancellationToken = default) - where TMessage : IOutboxPayloadMessage + public async ValueTask Publish( + IReadOnlyCollection messages, + int tenantId = 0, + CancellationToken cancellationToken = default) { if (messages.Count == 0) return 0; - return await Send(messages, cancellationToken); + return await Send(messages, tenantId, cancellationToken); } - private async ValueTask Send(IReadOnlyCollection messages, CancellationToken cancellationToken) - where TMessage : IOutboxPayloadMessage + private async ValueTask Send( + IReadOnlyCollection messages, + int tenantId, + CancellationToken cancellationToken) { - OutboxMessageTypeInfo typeInfo = OutboxMessageTypeHelper.GetOutboxMessageTypeInfo(); + var typeInfo = metadataProvider.GetMetadata(); DateTimeOffset now = timeProvider.GetUtcNow(); int maxBatchSize = publishSettings.MaxBatchSize; @@ -43,10 +48,12 @@ private async ValueTask Send(IReadOnlyCollection mess { TMessage message = enumerator.Current; + var payloadId = typeInfo.GetPayloadId(message!) ?? string.Empty; + payloadsSpan[count] = new OutboxMessage( - PayloadId: message.PayloadId ?? string.Empty, + PayloadId: payloadId, Payload: message, - PartInfo: new OutboxPartInfo(TenantId: message.TenantId, typeInfo.PartName, now)); + PartInfo: new OutboxPartInfo(TenantId: tenantId, typeInfo.PartName, now)); count++; } diff --git a/src/Sa.Outbox/Publication/OutboxMessageTypeHelper.cs b/src/Sa.Outbox/Publication/OutboxMessageTypeHelper.cs deleted file mode 100644 index f7ee49e..0000000 --- a/src/Sa.Outbox/Publication/OutboxMessageTypeHelper.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System.Collections.Concurrent; -using Sa.Outbox.Support; - -namespace Sa.Outbox.Publication; - -internal sealed record OutboxMessageTypeInfo(string PartName); - -internal static class OutboxMessageTypeHelper -{ - private static readonly ConcurrentDictionary s_cache = new(); - - public static OutboxMessageTypeInfo GetOutboxMessageTypeInfo() where T : IOutboxPayloadMessage - => s_cache.GetOrAdd(typeof(T), _ => new OutboxMessageTypeInfo(PartName: T.PartName)); - - public static OutboxMessageTypeInfo GetOutboxMessageTypeInfo(Type mt) - => s_cache.GetValueOrDefault(mt, new OutboxMessageTypeInfo("root")); -} diff --git a/src/Sa.Outbox/Publication/OutboxPublishSettings.cs b/src/Sa.Outbox/Publication/OutboxPublishSettings.cs new file mode 100644 index 0000000..2fe2319 --- /dev/null +++ b/src/Sa.Outbox/Publication/OutboxPublishSettings.cs @@ -0,0 +1,14 @@ +namespace Sa.Outbox.Publication; + +/// +/// Settings for publishing messages from the Outbox. +/// +public sealed class OutboxPublishSettings +{ + /// + /// The maximum batch size of messages to be sent at once. + /// Default value: 16. + /// for array pool size: 16, 32, 64, 128, 256, 512, 1024, 2048, 4096 + /// + public int MaxBatchSize { get; internal set; } = 64; +} diff --git a/src/Sa.Outbox/Configuration/OutboxPublishSettings.cs b/src/Sa.Outbox/Publication/OutboxPublishSettingsExtensions.cs similarity index 87% rename from src/Sa.Outbox/Configuration/OutboxPublishSettings.cs rename to src/Sa.Outbox/Publication/OutboxPublishSettingsExtensions.cs index 7a7759a..65a9dd8 100644 --- a/src/Sa.Outbox/Configuration/OutboxPublishSettings.cs +++ b/src/Sa.Outbox/Publication/OutboxPublishSettingsExtensions.cs @@ -1,18 +1,4 @@ -namespace Sa.Outbox; - -/// -/// Settings for publishing messages from the Outbox. -/// -public sealed class OutboxPublishSettings -{ - /// - /// The maximum batch size of messages to be sent at once. - /// Default value: 16. - /// for array pool size: 16, 32, 64, 128, 256, 512, 1024, 2048, 4096 - /// - public int MaxBatchSize { get; set; } = 64; -} - +namespace Sa.Outbox.Publication; /// /// Extension methods for fluent configuration of . diff --git a/src/Sa.Outbox/Publication/Setup.cs b/src/Sa.Outbox/Publication/Setup.cs index df1ea50..2501081 100644 --- a/src/Sa.Outbox/Publication/Setup.cs +++ b/src/Sa.Outbox/Publication/Setup.cs @@ -1,12 +1,34 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Sa.Outbox.Metadata; namespace Sa.Outbox.Publication; internal static class Setup { - public static IServiceCollection AddMessagePublisher(this IServiceCollection services) + public static IServiceCollection AddMessagePublisher( + this IServiceCollection services, + Action? configure = null) { + services.AddMessagesMetadata(); + + if (configure != null) + { + services.AddSingleton(configure); + } + + services.TryAddSingleton(sp => + { + OutboxPublishSettings settings = new(); + + foreach (var build in sp.GetServices>()) + { + build(sp, settings); + } + + return settings; + }); + services.TryAddSingleton(); return services; } diff --git a/src/Sa.Outbox/Readme.md b/src/Sa.Outbox/Readme.md index 45143cb..9c47aa4 100644 --- a/src/Sa.Outbox/Readme.md +++ b/src/Sa.Outbox/Readme.md @@ -1,248 +1,3 @@ # Outbox -The base logic and abstractions designed for implementing the Outbox pattern, with support for partitioning. - -## Base Interface for Defining Outbox Messages - -```csharp -/// -/// Represents a message payload in the Outbox system. -/// -public interface IOutboxPayloadMessage -{ - /// - /// Gets the unique identifier for the payload. - /// - string PayloadId { get; } - /// - /// Gets the identifier for the tenant associated with the payload. - /// - int TenantId { get; } - /// - /// Gets the part identifier associated with the Outbox message. - /// - static abstract string PartName { get; } -} - - -// example message - -public sealed record PingMessage(string PayloadId, int TenantId = 0) : IOutboxPayloadMessage -{ - public static string PartName => "root"; -} -``` - - - -## Main Interfaces for Working with Messages - -### Publishing Messages - -```csharp -/// -/// Defines a contract for publishing outbox messages. -/// -public interface IOutboxMessagePublisher -{ - /// - /// Publishes a collection of messages. - /// - ValueTask Publish(IReadOnlyCollection messages, CancellationToken cancellationToken = default) - where TMessage : IOutboxPayloadMessage; - - /// - /// Publishes a single message. - /// - ValueTask Publish(TMessage messages, CancellationToken cancellationToken = default) - where TMessage : IOutboxPayloadMessage => Publish(new[] { messages }, cancellationToken); -} -``` - -### Saving to Storage - -```csharp -public interface IOutboxRepository -{ - ValueTask Save(string payloadType, ReadOnlyMemory> messages, CancellationToken cancellationToken = default); -} -``` - - -### Delivery to Consumer - -```csharp -public interface IDeliveryRepository -{ - /// - /// Exclusively take for processing for the client - /// - Task StartDelivery(Memory> writeBuffer, int batchSize, TimeSpan lockDuration, OutboxMessageFilter filter, CancellationToken cancellationToken); - - /// - /// Complete the delivery - /// - Task FinishDelivery(IOutboxContext[] messages, OutboxMessageFilter filter, CancellationToken cancellationToken); - - /// - /// Extend the delivery (retain the lock for the client) - /// - Task ExtendDelivery(TimeSpan lockExpiration, OutboxMessageFilter filter, CancellationToken cancellationToken); -} -``` - - -### Support for Partitions -```csharp -/// -/// Represents a pair of tenant identifier and part information in the Outbox system. -/// This record is used to associate a tenant with a specific part of the Outbox message. -/// -/// The unique identifier for the tenant. -/// The part identifier associated with the tenant. -public record struct OutboxTenantPartPair(int TenantId, string Part); - -/// -/// Represents an interface for supporting partitioning in the Outbox processing system. -/// This interface defines a method for retrieving tenant-part pairs. -/// -public interface IOutboxPartitionalSupport -{ - /// - /// Asynchronously retrieves a collection of tenant-part pairs. - /// This method can be used to get the current mapping of tenants to their respective parts. - /// - Task> GetPartValues(CancellationToken cancellationToken); -} -``` - -### Message Consumer -```csharp -/// -/// Represents a consumer interface for processing Outbox messages of a specific type. -/// -public interface IConsumer : IConsumer - where TMessage : IOutboxPayloadMessage -{ - /// - /// Consumes a collection of Outbox messages. - /// This method processes the provided messages asynchronously. - /// - ValueTask Consume( - ConsumerGroupSettings settings, - OutboxMessageFilter filter, - ReadOnlyMemory> messages, - CancellationToken cancellationToken); -} - -public interface IConsumer -{ -} -``` - - - -## Examples of Outbox Using PostgreSQL - -### Configuration - -```csharp -using Microsoft.Extensions.DependencyInjection; -using Sa.Outbox; -using Sa.Outbox.PostgreSql; - -public class Startup -{ - public void ConfigureServices(IServiceCollection services) - { - // Outbox configuration - services.AddOutbox(builder => - { - // Delivery configuration - builder.WithDeliveries(deliveryBuilder => - { - // Add delivery (connect consumer for messages of type - MyMessage) - deliveryBuilder.AddDelivery(); - }); - - // Configuration for partitioning support - builder.WithPartitioningSupport((serviceProvider, partSettings) => - { - // Example configuration for processing messages for each tenant - partSettings.ForEachTenant = true; - - // Return the list of tenants for the app - partSettings.GetTenantIds = async cancellationToken => - { - // Logic to retrieve tenant identifiers - return await Task.FromResult(new int[] { 1, 2 }); - }; - }); - }); - - // Connecting Outbox using PostgreSQL - services.AddOutboxUsingPostgreSql(cfg => - { - // Database connection - cfg.AddDataSource(c => c.WithConnectionString("Host=my_host;Database=my_db;Username=my_user;Password=my_password")); - - // Settings for working with Pg - cfg.WithPgOutboxSettings((_, settings) => - { - // Set the database schema - settings.TableSettings.DatabaseSchemaName = "public"; - - // Cleanup settings - settings.CleanupSettings.DropPartsAfterRetention = TimeSpan.FromDays(30); - }); - }); - } -} -``` - -### Example of Sending/Consuming a Message - -```csharp - -// Some message -public sealed record MyMessage(string PayloadId, string Content, int TenantId) : IOutboxPayloadMessage -{ - public static string PartName => "root"; -} - -// Sending -public class MessageSender(IOutboxMessagePublisher publisher) -{ - public async Task SendMessagesAsync(CancellationToken cancellationToken) - { - var messages = [ - new MyMessage { PayloadId = Guid.NewGuid().ToString(), Content = "Hello, World!", TenantId = 1 }, - new MyMessage { PayloadId = Guid.NewGuid().ToString(), Content = "Another message", TenantId = 2 } - ]; - - ulong result = await publisher.Publish(messages, cancellationToken); - - Console.WriteLine($"Sent {result} messages."); - } -} - - -// Consuming -public class MyMessageConsumer : IConsumer -{ - public async ValueTask Consume(ConsumerGroupSettings settings, - OutboxMessageFilter filter, - ReadOnlyMemory> messages, - CancellationToken cancellationToken) - { - foreach (var messageContext in messages) - { - // Logic for processing the message - Console.WriteLine($"Processing message: {messageContext.Payload}"); - - // Successful message processing - messageContext.Ok("Message processed successfully."); - } - } -} -``` +The base logic and abstractions designed for implementing the Outbox pattern, with support for partitioning. \ No newline at end of file diff --git a/src/Sa.Outbox/Sa.Outbox.csproj b/src/Sa.Outbox/Sa.Outbox.csproj index 5dcc0cb..92e7708 100644 --- a/src/Sa.Outbox/Sa.Outbox.csproj +++ b/src/Sa.Outbox/Sa.Outbox.csproj @@ -3,7 +3,7 @@ - 0.6.1 + 0.7.0 Simple Outbox infra for publishing and using messages @@ -19,7 +19,6 @@ - diff --git a/src/Sa.Outbox/Setup.cs b/src/Sa.Outbox/Setup.cs index b1d27b6..711e1b5 100644 --- a/src/Sa.Outbox/Setup.cs +++ b/src/Sa.Outbox/Setup.cs @@ -1,5 +1,4 @@ using Microsoft.Extensions.DependencyInjection; -using Sa.Outbox.Configuration; namespace Sa.Outbox; @@ -9,7 +8,7 @@ public static IServiceCollection AddOutbox( this IServiceCollection services, Action? build = null) { - OutboxBuilder builder = new(services); + OutboxBuilder builder = OutboxBuilder.Create(services); build?.Invoke(builder); return services; } diff --git a/src/Sa.Partitional.PostgreSql/Sa.Partitional.PostgreSql.csproj b/src/Sa.Partitional.PostgreSql/Sa.Partitional.PostgreSql.csproj index 69046c6..6e77f6a 100644 --- a/src/Sa.Partitional.PostgreSql/Sa.Partitional.PostgreSql.csproj +++ b/src/Sa.Partitional.PostgreSql/Sa.Partitional.PostgreSql.csproj @@ -3,7 +3,7 @@ - 0.6.1 + 0.7.0 For managing table partitioning in PostgreSQL diff --git a/src/Sa.Schedule/Sa.Schedule.csproj b/src/Sa.Schedule/Sa.Schedule.csproj index 6273be4..1f196c4 100644 --- a/src/Sa.Schedule/Sa.Schedule.csproj +++ b/src/Sa.Schedule/Sa.Schedule.csproj @@ -3,7 +3,7 @@ - 0.6.0 + 0.7.0 Execute jobs on a schedule diff --git a/src/Sa.slnx b/src/Sa.slnx index b633660..7bfaae7 100644 --- a/src/Sa.slnx +++ b/src/Sa.slnx @@ -22,7 +22,6 @@ - diff --git a/src/Samples/PgOutbox.ConsoleApp/Program.cs b/src/Samples/PgOutbox.ConsoleApp/Program.cs index 59a48e3..15edff4 100644 --- a/src/Samples/PgOutbox.ConsoleApp/Program.cs +++ b/src/Samples/PgOutbox.ConsoleApp/Program.cs @@ -3,10 +3,10 @@ using Microsoft.Extensions.Logging; using PgOutbox; using Sa.Outbox; +using Sa.Outbox.Delivery; using Sa.Outbox.PostgreSql; using Sa.Outbox.PostgreSql.Serialization; using Sa.Outbox.Publication; -using Sa.Outbox.Support; using System.Text.Json; using System.Text.Json.Serialization; @@ -15,46 +15,48 @@ var connectionString = "Host=localhost;Username=postgres;Password=postgres;Database=postgres"; // default configure... -IHost host = Host.CreateDefaultBuilder() - .ConfigureServices(services => services - .AddOutbox(builder => builder - .WithTenantSettings((_, ts) => ts.WithTenantIds(1, 2, 3)) - .WithDeliveries(builder => builder - .AddDeliveryScoped((_, settings) => - { - settings.ScheduleSettings.WithIntervalSeconds(5).WithImmediate(); - settings.ConsumeSettings.WithSingleIteration(); - }) - .AddDelivery("rnd", (_, settings) => - { - settings.ScheduleSettings.WithIntervalSeconds(25); - settings.ConsumeSettings.WithSingleIteration().WithMaxDeliveryAttempts(2); - }) - ) - ) - // outbox pg - .AddOutboxUsingPostgreSql(cfg => cfg - .WithDataSource(ds => ds.WithConnectionString(connectionString)) - .WithOutboxSettings((_, settings) => +IHost host = Host.CreateDefaultBuilder().ConfigureServices(services => services + // outbox + .AddOutbox(builder => builder + .WithTenants((_, t) => t.WithTenantIds(1, 2, 3)) + .WithMetadata((_, b) => b.AddMetadata("some", getPayloadId: p => p.PayloadId)) + .WithDeliveries(b => b + .AddDeliveryScoped((_, settings) => { - settings.TableSettings.WithSchema("test"); - settings.ConsumeSettings.WithMinOffset(DateTimeOffset.Now); + settings.ScheduleSettings.WithIntervalSeconds(5).WithImmediate(); + settings.ConsumeSettings.WithSingleIteration(); + }) + .AddDelivery("rnd", (_, settings) => + { + settings.ScheduleSettings.WithIntervalSeconds(25); + settings.ConsumeSettings.WithSingleIteration().WithMaxDeliveryAttempts(2); }) - .WithMessageSerializer(new OutboxMessageSerializer()) ) - .AddHostedService() ) - .Build(); + // outbox for pg + .AddOutboxUsingPostgreSql(cfg => cfg + .WithDataSource(ds => ds.WithConnectionString(connectionString)) + .WithOutboxSettings((_, settings) => + { + settings.TableSettings.WithSchema("test"); + settings.ConsumeSettings.WithMinOffset(DateTimeOffset.Now); + }) + .WithMessageSerializer(OutboxMessageSerializer.Instance) + ) + // publish as service + .AddHostedService() +) +.Build(); // -- code publish var publisher = host.Services.GetRequiredService(); await publisher.Publish([ - new SomeMessage("01", "Hi 1", 1), - new SomeMessage("02", "Hi 2", 2), - new SomeMessage("03", "Hi 3", 3) -]); + new SomeMessage("01", "Hi 1"), + new SomeMessage("02", "Hi 2"), + new SomeMessage("03", "Hi 3") +], tenantId: 1); await host.RunAsync(); @@ -62,10 +64,7 @@ await publisher.Publish([ namespace PgOutbox { - public sealed record SomeMessage(string PayloadId, string Message, int TenantId) : IOutboxPayloadMessage - { - static string IOutboxHasPart.PartName => "some"; - } + public sealed record SomeMessage(string PayloadId, string Message); public sealed class Group1Consumer(ILogger logger) : IConsumer @@ -161,14 +160,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { for (int i = 0; i < 100 && !stoppingToken.IsCancellationRequested; i++) { - var rnd = Random.Shared.Next(1, 4); - await Task.Delay(TimeSpan.FromSeconds(rnd), stoppingToken); + var tenantId = Random.Shared.Next(1, 4); + await Task.Delay(TimeSpan.FromSeconds(tenantId), stoppingToken); await publisher.Publish( new SomeMessage( i.ToString(), - DateTime.Now.ToString(), - rnd), + DateTime.Now.ToString()), + tenantId, stoppingToken); } } @@ -183,15 +182,25 @@ await publisher.Publish( #region forAOT public class OutboxMessageSerializer : IOutboxMessageSerializer { - public T? Deserialize(Stream stream) => (typeof(T) == typeof(SomeMessage)) - ? (T?)(object?)JsonSerializer.Deserialize(stream, SomeMessageJsonSerializerContext.Default.SomeMessage) - : default; + public T? Deserialize(Stream stream) + { + return typeof(T) switch + { + Type t when t == typeof(SomeMessage) => + (T?)(object?)JsonSerializer.Deserialize( + stream, SomeMessageJsonSerializerContext.Default.SomeMessage), + + _ => default + }; + } public void Serialize(Stream stream, T value) { if (typeof(T) == typeof(SomeMessage)) JsonSerializer.Serialize(stream, value!, SomeMessageJsonSerializerContext.Default.SomeMessage); } + + public readonly static OutboxMessageSerializer Instance = new(); } [JsonSourceGenerationOptions(GenerationMode = JsonSourceGenerationMode.Metadata)] diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryBatchingWindowTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryBatchingWindowTests.cs index 6bc5bb7..bd70048 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryBatchingWindowTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryBatchingWindowTests.cs @@ -26,7 +26,7 @@ public Fixture() : base() { Services .AddOutbox(b => b - .WithTenantSettings((_, s) => s.WithTenantIds(1, 2)) + .WithTenants((_, s) => s.WithTenantIds(1, 2)) .WithDeliveries(builder => builder .AddDeliveryScoped("test3", (_, s) => { @@ -51,13 +51,11 @@ public async Task Deliver_Process_MustBe_Work() { Console.Write(fixture.ConnectionString); - List messages = - [ - new TestMessage { PayloadId = "11", Content = "Message 1", TenantId = 1}, - new TestMessage { PayloadId = "12", Content = "Message 2", TenantId = 2} - ]; - var cnt = await fixture.Publisher.Publish(messages, TestContext.Current.CancellationToken); + var cnt = await fixture.Publisher.Publish(new TestMessage { PayloadId = "11", Content = "Message 1", TenantId = 1 }, 1, TestContext.Current.CancellationToken); + Assert.True(cnt > 0); + + cnt = await fixture.Publisher.Publish(new TestMessage { PayloadId = "12", Content = "Message 2", TenantId = 2 }, 2, TestContext.Current.CancellationToken); Assert.True(cnt > 0); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryLongProcessorTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryLongProcessorTests.cs index 8d2abb6..578d658 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryLongProcessorTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryLongProcessorTests.cs @@ -29,7 +29,8 @@ public Fixture() : base() { Services .AddOutbox(builder => builder - .WithTenantSettings((_, s) => s.WithTenantIds(1, 2)) + .WithMetadata((_, b) => b.AddMetadata("root_1", m => m.PayloadId)) + .WithTenants((_, s) => s.WithTenantIds(1, 2)) .WithDeliveries(b => b .AddDeliveryScoped("test1", (_, s) => { @@ -63,7 +64,7 @@ public async Task Deliver_LongProcess_MustBe_Work() new TestMessage { PayloadId = "12", Content = "Message 2", TenantId = 2} ]; - var cnt = await fixture.Publisher.Publish(messages, TestContext.Current.CancellationToken); + var cnt = await fixture.Publisher.Publish(messages, m => m.TenantId, TestContext.Current.CancellationToken); Assert.True(cnt > 0); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryPermanentErrorTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryPermanentErrorTests.cs index 4abbd62..1dedbc5 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryPermanentErrorTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryPermanentErrorTests.cs @@ -38,7 +38,7 @@ public Fixture() : base() { Services .AddOutbox(builder => builder - .WithTenantSettings((_, sp) => sp.WithTenantIds(1, 2)) + .WithTenants((_, sp) => sp.WithTenantIds(1, 2)) .WithDeliveries(builder => builder .AddDeliveryScoped("test2", (_, s) => { @@ -70,7 +70,7 @@ public async Task Deliver_ErrorProcess_MustBe_Logged() new TestMessage { PayloadId = "12", Content = "Message 2", TenantId = 2} ]; - var cnt = await fixture.Publisher.Publish(messages, TestContext.Current.CancellationToken); + var cnt = await fixture.Publisher.Publish(messages, m => m.TenantId, TestContext.Current.CancellationToken); Assert.True(cnt > 0); var result = await Sub.ProcessMessages(fixture.OutboxSettings, CancellationToken.None); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryRetryErrorTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryRetryErrorTests.cs index e5618b2..574cafc 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryRetryErrorTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryRetryErrorTests.cs @@ -35,7 +35,7 @@ public Fixture() : base() { Services .AddOutbox(builder => builder - .WithTenantSettings((_, ts) => ts.WithTenantIds(1)) + .WithTenants((_, ts) => ts.WithTenantIds(1)) .WithDeliveries(builder => builder .AddDeliveryScoped("test4", (_, s) => { @@ -72,7 +72,7 @@ public async Task Deliver_RetriesOnErrorProcess_MustBe_Logged_501() List messages = [new TestMessage { PayloadId = "1", Content = "Message 1", TenantId = 1 }]; - ulong cnt = await fixture.Publisher.Publish(messages, TestContext.Current.CancellationToken); + ulong cnt = await fixture.Publisher.Publish(messages, m => m.TenantId, TestContext.Current.CancellationToken); Assert.True(cnt > 0); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryWithAutoTenantDetectionTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryWithAutoTenantDetectionTests.cs index 3c9034d..1870e3d 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryWithAutoTenantDetectionTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/Delivery/DeliveryWithAutoTenantDetectionTests.cs @@ -45,7 +45,7 @@ public Fixture() : base() { Services .AddOutbox(builder => builder - .WithTenantSettings((_, s) => s.WithAutoDetect()) + .WithTenants((_, s) => s.WithAutoDetect()) .WithDeliveries(b => b .AddDelivery("test_auto_detect", (_, s) => { @@ -90,7 +90,7 @@ public async Task DeliverProcessWithAutoTenantModeMustBeWork() new TestMessage { PayloadId = "03", Content = "M 3", TenantId = 3} ]; - var cnt = await fixture.Publisher.Publish(messages, TestContext.Current.CancellationToken); + var cnt = await fixture.Publisher.Publish(messages, m => m.TenantId, TestContext.Current.CancellationToken); Assert.True(cnt > 0); var result = await Sub.ProcessMessages(fixture.OutboxSettings, CancellationToken.None); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxMessageSerializer.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxMessageSerializer.cs index ff2912d..ab57354 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxMessageSerializer.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxMessageSerializer.cs @@ -1,5 +1,5 @@ -using System.Text.Json; -using Sa.Outbox.PostgreSql.Serialization; +using Sa.Outbox.PostgreSql.Serialization; +using System.Text.Json; namespace Sa.Outbox.PostgreSqlTests; diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxParallelMessagingTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxParallelMessagingTests.cs index f89ff77..139f0d6 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxParallelMessagingTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxParallelMessagingTests.cs @@ -1,8 +1,8 @@ using Microsoft.Extensions.DependencyInjection; using Sa.Data.PostgreSql.Fixture; +using Sa.Outbox.Delivery; using Sa.Outbox.PostgreSql; using Sa.Outbox.Publication; -using Sa.Outbox.Support; using Sa.Schedule; namespace Sa.Outbox.PostgreSqlTests; @@ -19,20 +19,25 @@ static class GenMessageRange public static int GetMessageCount() => Random.Shared.Next(From, To); } - class SomeMessage1 : IOutboxPayloadMessage + + class HasTenant + { + public int TenantId { get; set; } = Random.Shared.Next(1, 2); + } + + + class SomeMessage1 : HasTenant { public static string PartName => "multi_1"; public string PayloadId { get; set; } = Guid.NewGuid().ToString(); - public int TenantId { get; set; } = Random.Shared.Next(1, 2); } - class SomeMessage2 : IOutboxPayloadMessage + class SomeMessage2 : HasTenant { public static string PartName => "multi_2"; public string PayloadId { get; set; } = Guid.NewGuid().ToString(); - public int TenantId { get; set; } = Random.Shared.Next(1, 2); } static class CommonCounter @@ -77,10 +82,11 @@ public class Fixture : PgDataSourceFixture public Fixture() : base() { Services - .AddOutbox(builder => - { - builder - .WithTenantSettings((_, sp) => sp.WithTenantIds(1, 2)) + .AddOutbox(builder => builder + .WithTenants((_, b) => b.WithTenantIds(1, 2)) + .WithMetadata((_, configure) => configure + .AddMetadata(SomeMessage1.PartName) + .AddMetadata(SomeMessage2.PartName)) .WithDeliveries(builder => builder .AddDeliveryScoped("test7_0", (_, settings) => { @@ -98,9 +104,9 @@ public Fixture() : base() settings.ConsumeSettings.WithMaxBatchSize(1024); }) - ); - builder.PublishSettings.MaxBatchSize = 1024; - }) + ) + .WithPublishSettings((_, b) => b.WithMaxBatchSize(1024)) + ) .AddOutboxUsingPostgreSql(cfg => { cfg @@ -150,7 +156,7 @@ public async Task ParallelMessaging_MustBeProcessed() } private static async Task RunPublish(IOutboxMessagePublisher publisher) - where T : IOutboxPayloadMessage, new() + where T : HasTenant, new() { long total = 0; List nodes = [.. Enumerable.Range(1, GenMessageRange.Threads)]; @@ -164,7 +170,7 @@ private static async Task RunPublish(IOutboxMessagePublisher publisher) messages.Add(new T()); } - await publisher.Publish(messages, TestContext.Current.CancellationToken); + await publisher.Publish(messages, m => m.TenantId, TestContext.Current.CancellationToken); }); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTenantParallelismTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTenantParallelismTests.cs index 0cd1749..2e1109b 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTenantParallelismTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTenantParallelismTests.cs @@ -1,17 +1,17 @@ -using System.Collections.Concurrent; -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Sa.Data.PostgreSql.Fixture; +using Sa.Outbox.Delivery; using Sa.Outbox.PostgreSql; using Sa.Outbox.Publication; -using Sa.Outbox.Support; using Sa.Schedule; +using System.Collections.Concurrent; namespace Sa.Outbox.PostgreSqlTests; public class OutboxTenantParallelismTests(OutboxTenantParallelismTests.Fixture fixture) : IClassFixture { - class TestMessage : IOutboxPayloadMessage + class TestMessage { public static string PartName => "parallel_test"; public string PayloadId { get; } = Guid.NewGuid().ToString(); @@ -100,7 +100,8 @@ public Fixture() : base() { Services .AddOutbox(builder => builder - .WithTenantSettings((_, sp) => sp.WithTenantIds(1, 2, 3, 4, 5)) + .WithTenants((_, s) => s.WithTenantIds(1, 2, 3, 4, 5)) + .WithMetadata((_, configure) => configure.AddMetadata(TestMessage.PartName)) .WithDeliveries(deliveryBuilder => deliveryBuilder .AddDeliveryScoped( "parallel_test_group", @@ -157,6 +158,7 @@ public async Task Outbox_TenantParallelism_ShouldProcessTenantsConcurrently() // Act ulong totalPublished = await publisher.Publish( messages, + m => m.TenantId, TestContext.Current.CancellationToken); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTests.cs index 32411bd..499c651 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTests.cs @@ -1,8 +1,8 @@ using Microsoft.Extensions.DependencyInjection; using Sa.Data.PostgreSql.Fixture; +using Sa.Outbox.Delivery; using Sa.Outbox.PostgreSql; using Sa.Outbox.Publication; -using Sa.Outbox.Support; using Sa.Partitional.PostgreSql; using Sa.Schedule; @@ -10,12 +10,10 @@ namespace Sa.Outbox.PostgreSqlTests; public class OutBoxTests(OutBoxTests.Fixture fixture) : IClassFixture { - class SomeMessage : IOutboxPayloadMessage + class SomeMessage { public string PayloadId { get; set; } = default!; public int TenantId { get; set; } - public static string PartName => "some"; - } class SomeMessageConsumer : IConsumer @@ -41,7 +39,7 @@ public Fixture() : base() { Services .AddOutbox(builder => builder - .WithTenantSettings((_, s) => s.WithTenantIds(1)) + .WithTenants((_, s) => s.WithTenantIds(1)) .WithDeliveries(builder => builder .AddDeliveryScoped("test6", (_, settings) => { @@ -93,7 +91,7 @@ public async Task OutBoxTest() new SomeMessage { TenantId = 1 }, new SomeMessage { TenantId = 1 }, new SomeMessage { TenantId = 1 } - ], TestContext.Current.CancellationToken); + ], 1, TestContext.Current.CancellationToken); Assert.True(total > 0); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTwoGroupsTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTwoGroupsTests.cs index 4a58af6..925d75f 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTwoGroupsTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/OutboxTwoGroupsTests.cs @@ -1,8 +1,8 @@ using Microsoft.Extensions.DependencyInjection; using Sa.Data.PostgreSql.Fixture; +using Sa.Outbox.Delivery; using Sa.Outbox.PostgreSql; using Sa.Outbox.Publication; -using Sa.Outbox.Support; using Sa.Schedule; namespace Sa.Outbox.PostgreSqlTests; @@ -10,10 +10,8 @@ namespace Sa.Outbox.PostgreSqlTests; public class OutboxTwoGroupsTests(OutboxTwoGroupsTests.Fixture fixture) : IClassFixture { - class SomeMessage : IOutboxPayloadMessage + class SomeMessage { - public static string PartName => "some"; - public string PayloadId { get; } = Guid.NewGuid().ToString(); public int TenantId { get; set; } } @@ -54,7 +52,7 @@ public Fixture() : base() { Services .AddOutbox(builder => builder - .WithTenantSettings((_, s) => s.WithTenantIds(1, 2)) + .WithTenants((_, s) => s.WithTenantIds(1, 2)) .WithDeliveries(deliveryBuilder => deliveryBuilder .AddDeliveryScoped("test_gr1", (_, settings) => @@ -113,12 +111,13 @@ public async Task OutBox_TwoGroups_ShouldProcessSeparately() new SomeMessage { TenantId = 1 } }; - ulong total = await publisher.Publish(messages, TestContext.Current.CancellationToken); + ulong total = await publisher.Publish(messages, m => m.TenantId, TestContext.Current.CancellationToken); int attempts = 0; const int maxAttempts = 20; - while ((SomeMessageConsumerGr1.Counter < (int)total || SomeMessageConsumerGr2.Counter < (int)total) && attempts++ < maxAttempts) + while ((SomeMessageConsumerGr1.Counter < (int)total + || SomeMessageConsumerGr2.Counter < (int)total) && attempts++ < maxAttempts) { await Task.Delay(400, TestContext.Current.CancellationToken); } diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/Publisher/OutboxPublisherTests.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/Publisher/OutboxPublisherTests.cs index 80d1582..de3ee62 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/Publisher/OutboxPublisherTests.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/Publisher/OutboxPublisherTests.cs @@ -19,15 +19,9 @@ public async Task Publish_MultipleMessages_ReturnsExpectedResult() { Console.Write(fixture.ConnectionString); - // Arrange - List messages = - [ - new TestMessage { PayloadId = "1", Content = "Message 1", TenantId = 1}, - new TestMessage { PayloadId = "2", Content = "Message 2", TenantId = 2} - ]; - - // Act - ulong result = await Sub.Publish(messages, TestContext.Current.CancellationToken); + ulong result = await Sub.Publish(new TestMessage { PayloadId = "1", Content = "Message 1", TenantId = 1 }, 1, TestContext.Current.CancellationToken); + + result += await Sub.Publish(new TestMessage { PayloadId = "2", Content = "Message 3", TenantId = 2 }, 2, TestContext.Current.CancellationToken); // Assert Assert.Equal(2, (int)result); diff --git a/src/Tests/Sa.Outbox.PostgreSqlTests/TestMessage.cs b/src/Tests/Sa.Outbox.PostgreSqlTests/TestMessage.cs index f083112..9189cc8 100644 --- a/src/Tests/Sa.Outbox.PostgreSqlTests/TestMessage.cs +++ b/src/Tests/Sa.Outbox.PostgreSqlTests/TestMessage.cs @@ -1,12 +1,8 @@ -using Sa.Outbox.Support; +namespace Sa.Outbox.PostgreSqlTests; -namespace Sa.Outbox.PostgreSqlTests; - -internal sealed class TestMessage : IOutboxPayloadMessage +internal sealed class TestMessage { - public static string PartName => "root"; - public required string PayloadId { get; set; } public string? Content { get; set; } public int TenantId { get; set; }