From 9279953a0dee9ae9cca07e37d3b7c85ad0d8bde2 Mon Sep 17 00:00:00 2001 From: Ivan Gubanov Date: Fri, 29 May 2026 14:14:48 +0300 Subject: [PATCH 1/5] added a new method to register integration events, add helper classes / setup options to have ability handle all rabbit events + produce external events (for other systems) --- .../IIntegrationEventPublisher.cs | 2 +- .../CapConsumerExecutor.cs | 4 +- .../CapConsumerServiceSelector.cs | 77 +++++++--- .../DependencyInjection.cs | 140 ++++++++++++------ src/Bss.Platform.Events/EventTypeProvider.cs | 58 ++++++++ .../Interfaces/IEventTypeProvider.cs | 8 + .../Interfaces/IIntegrationEventProcessor.cs | 6 +- .../Interfaces/IIntegrationEventSetup.cs | 27 ++++ .../IntegrationEventsMessageQueueOptions.cs | 2 + .../Publishers/IntegrationEventPublisher.cs | 45 +++++- src/Directory.Build.props | 3 +- src/__SolutionItems/CommonAssemblyInfo.cs | 6 +- 12 files changed, 294 insertions(+), 84 deletions(-) create mode 100644 src/Bss.Platform.Events/EventTypeProvider.cs create mode 100644 src/Bss.Platform.Events/Interfaces/IEventTypeProvider.cs create mode 100644 src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs diff --git a/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs b/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs index 1e6cb80..87f020c 100644 --- a/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs +++ b/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs @@ -2,5 +2,5 @@ namespace Bss.Platform.Events.Abstractions; public interface IIntegrationEventPublisher { - Task PublishAsync(IIntegrationEvent @event, CancellationToken cancellationToken); + Task PublishAsync(object @event, CancellationToken cancellationToken); } diff --git a/src/Bss.Platform.Events/CapConsumerExecutor.cs b/src/Bss.Platform.Events/CapConsumerExecutor.cs index ca29eb7..873595c 100644 --- a/src/Bss.Platform.Events/CapConsumerExecutor.cs +++ b/src/Bss.Platform.Events/CapConsumerExecutor.cs @@ -1,10 +1,8 @@ -using Bss.Platform.Events.Abstractions; using Bss.Platform.Events.Interfaces; namespace Bss.Platform.Events; -internal class CapConsumerExecutor(IIntegrationEventProcessor eventProcessor) - where TEvent : IIntegrationEvent +internal class CapConsumerExecutor(IIntegrationEventProcessor eventProcessor) { public Task HandleAsync(TEvent @event, CancellationToken cancellationToken) => eventProcessor.ProcessAsync(@event, cancellationToken); } diff --git a/src/Bss.Platform.Events/CapConsumerServiceSelector.cs b/src/Bss.Platform.Events/CapConsumerServiceSelector.cs index dd33ba5..5a26741 100644 --- a/src/Bss.Platform.Events/CapConsumerServiceSelector.cs +++ b/src/Bss.Platform.Events/CapConsumerServiceSelector.cs @@ -1,6 +1,7 @@ using System.Reflection; using Bss.Platform.Events.Abstractions; +using Bss.Platform.Events.Interfaces; using DotNetCore.CAP; using DotNetCore.CAP.Internal; @@ -10,44 +11,74 @@ namespace Bss.Platform.Events; -public class CapConsumerServiceSelector(IServiceProvider serviceProvider, Assembly assembly) +public class CapConsumerServiceSelectorLegacy(IServiceProvider serviceProvider, Assembly assembly) + : CapConsumerServiceSelectorBase(serviceProvider) +{ + protected override IEnumerable GetInternalEventTypes() => + assembly + .ExportedTypes + .Where(x => typeof(IIntegrationEvent).IsAssignableFrom(x) && x is { IsInterface: false, IsAbstract: false }); + + protected override CapSubscribeAttribute ProvideCapSubscribeAttribute(Type eventType) + { + var subscribeAttribute = new CapSubscribeAttribute(eventType.Name); + this.SetSubscribeAttribute(subscribeAttribute); + return subscribeAttribute; + } +} + +public class CapConsumerServiceSelectorNew(IServiceProvider serviceProvider, IEventTypeProvider eventTypeProvider, IOptions capOptions) + : CapConsumerServiceSelectorBase(serviceProvider) +{ + private readonly string queueName = capOptions.Value.DefaultGroupName; + protected override IEnumerable GetInternalEventTypes() => + eventTypeProvider.InternalEvents.Keys; + + protected override CapSubscribeAttribute ProvideCapSubscribeAttribute(Type eventType) => + new(eventTypeProvider.InternalEvents[eventType]) { Group = this.queueName }; +} + +public abstract class CapConsumerServiceSelectorBase(IServiceProvider serviceProvider) : ConsumerServiceSelector(serviceProvider) { - protected override IEnumerable FindConsumersFromControllerTypes() => []; + protected abstract IEnumerable GetInternalEventTypes(); + + protected abstract CapSubscribeAttribute ProvideCapSubscribeAttribute(Type eventType); + + protected override IEnumerable FindConsumersFromControllerTypes() => + []; protected override IEnumerable FindConsumersFromInterfaceTypes(IServiceProvider provider) { var namePrefix = provider.GetRequiredService>().Value.TopicNamePrefix; - return assembly - .ExportedTypes - .Where(x => typeof(IIntegrationEvent).IsAssignableFrom(x) && x is { IsInterface: false, IsAbstract: false }) - .Select(x => this.CreateExecutorDescriptor(typeof(CapConsumerExecutor<>).MakeGenericType(x), x, namePrefix)); + return this.GetInternalEventTypes() + .Select(x => this.CreateExecutorDescriptor(typeof(CapConsumerExecutor<>).MakeGenericType(x), x, namePrefix)); } + private ConsumerExecutorDescriptor CreateExecutorDescriptor(Type executor, Type @event, string? namePrefix) { - var subscribeAttribute = new CapSubscribeAttribute(@event.Name); - this.SetSubscribeAttribute(subscribeAttribute); + var subscribeAttribute = this.ProvideCapSubscribeAttribute(@event); var methodInfo = executor - .GetRuntimeMethods() - .Single(x => x.Name.Contains(nameof(CapConsumerExecutor.HandleAsync))); + .GetRuntimeMethods() + .Single(x => x.Name.Contains(nameof(CapConsumerExecutor<>.HandleAsync))); var methodParameters = methodInfo.GetParameters(); return new ConsumerExecutorDescriptor - { - Attribute = subscribeAttribute, - ClassAttribute = null, - MethodInfo = methodInfo, - ImplTypeInfo = executor.GetTypeInfo(), - ServiceTypeInfo = null, - TopicNamePrefix = namePrefix, - Parameters = new List - { - new() { ParameterType = methodParameters[0].ParameterType, IsFromCap = false }, - new() { ParameterType = methodParameters[1].ParameterType, IsFromCap = true } - } - }; + { + Attribute = subscribeAttribute, + ClassAttribute = null, + MethodInfo = methodInfo, + ImplTypeInfo = executor.GetTypeInfo(), + ServiceTypeInfo = null, + TopicNamePrefix = namePrefix, + Parameters = new List + { + new() { ParameterType = methodParameters[0].ParameterType, IsFromCap = false }, + new() { ParameterType = methodParameters[1].ParameterType, IsFromCap = true } + } + }; } } diff --git a/src/Bss.Platform.Events/DependencyInjection.cs b/src/Bss.Platform.Events/DependencyInjection.cs index f67d064..d81d675 100644 --- a/src/Bss.Platform.Events/DependencyInjection.cs +++ b/src/Bss.Platform.Events/DependencyInjection.cs @@ -8,6 +8,7 @@ using DotNetCore.CAP; using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc.Filters; @@ -30,60 +31,105 @@ public static IServiceCollection AddPlatformIntegrationEvents( { services .AddSingleton() - .AddSingleton(x => new CapConsumerServiceSelector(x, eventsAssembly)) - .AddScoped() - .AddScoped( - serviceProvider => - { - var capTransaction = ActivatorUtilities.CreateInstance(serviceProvider); - capTransaction.DbTransaction = serviceProvider.GetRequiredService(); - return capTransaction; - }) - .AddCap( - x => - { - var eventsOptions = IntegrationEventsOptions.Default; - setup?.Invoke(eventsOptions); + .AddSingleton(x => new(x, eventsAssembly)) + .AddScoped() + .AddPlatformIntegrationEventsInternal(setup); + + return services; + } + + public static IServiceCollection AddPlatformIntegrationEvents( + this IServiceCollection services, + Action? setupOptions = null, + Action>? setupEvents = null) + where TEventProcessor : class, IIntegrationEventProcessor + { + var typeProvider = new EventTypeProvider(typeof(TInternalEvent).Assembly); + setupEvents?.Invoke(typeProvider); - x.FailedRetryCount = eventsOptions.FailedRetryCount; - x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds; + services + .AddSingleton(typeProvider) + .AddSingleton() + .AddScoped() + .AddPlatformIntegrationEventsInternal(setupOptions) + .Configure((RabbitMQOptions opt) => + { + // note: required for messages generated by not CAP + opt.CustomHeadersBuilder = (msg, sp) => + [ + new(Headers.MessageId, sp.GetRequiredService().NextId().ToString()), + new(Headers.MessageName, msg.RoutingKey), + new(Headers.Type, typeof(TInternalEvent).Name) + ]; + }); + + services.AddSingleton, TEventProcessor>(); + // NOTE: register TEventProcessor for each type (required for CapConsumerExecutor) + typeProvider.InternalEvents.Keys + .Select(t => typeof(IIntegrationEventProcessor<>).MakeGenericType(t)) + .ToList() + .ForEach(x => services.AddSingleton(x, sp => sp.GetRequiredService>())); - x.UseSqlServer( - o => - { - o.ConnectionString = eventsOptions.SqlServer.ConnectionString; - o.Schema = eventsOptions.SqlServer.Schema; - }); + return services; + } - x.UseDashboard(o => - { - o.PathMatch = eventsOptions.DashboardPath; - if (eventsOptions.AuthorizationPredicate is { } authPredicate) - { - o.AllowAnonymousExplicit = false; - o.AuthorizationPolicy = AddDashboardAuthorizationPolicy(services, authPredicate); - } - }); - - if (!eventsOptions.MessageQueue.Enable) + private static IServiceCollection AddPlatformIntegrationEventsInternal( + this IServiceCollection services, + Action? setupEventOptions = null) + { + services + .AddScoped(serviceProvider => + { + var capTransaction = ActivatorUtilities.CreateInstance(serviceProvider); + capTransaction.DbTransaction = serviceProvider.GetRequiredService(); + return capTransaction; + }) + .AddCap(x => + { + var eventsOptions = IntegrationEventsOptions.Default; + setupEventOptions?.Invoke(eventsOptions); + + x.FailedRetryCount = eventsOptions.FailedRetryCount; + x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds; + + x.UseSqlServer(o => + { + o.ConnectionString = eventsOptions.SqlServer.ConnectionString; + o.Schema = eventsOptions.SqlServer.Schema; + }); + + x.UseDashboard(o => + { + o.PathMatch = eventsOptions.DashboardPath; + if (eventsOptions.AuthorizationPredicate is { } authPredicate) { - x.UseInMemoryMessageQueue(); - return; + o.AllowAnonymousExplicit = false; + o.AuthorizationPolicy = AddDashboardAuthorizationPolicy(services, authPredicate); } + }); + + var rabbitSettings = eventsOptions.MessageQueue; + if (!rabbitSettings.Enable) + { + x.UseInMemoryMessageQueue(); + return; + } + + x.DefaultGroupName = string.IsNullOrWhiteSpace(rabbitSettings.QueueName) + ? rabbitSettings.ExchangeName + : rabbitSettings.QueueName; - x.DefaultGroupName = eventsOptions.MessageQueue.ExchangeName; - x.UseRabbitMQ( - o => - { - o.HostName = eventsOptions.MessageQueue.Host; - o.Port = eventsOptions.MessageQueue.Port; - o.VirtualHost = eventsOptions.MessageQueue.VirtualHost; - o.Password = eventsOptions.MessageQueue.Secret; - o.UserName = eventsOptions.MessageQueue.UserName; - o.ExchangeName = eventsOptions.MessageQueue.ExchangeName; - o.BasicQosOptions = new RabbitMQOptions.BasicQos(1, true); - }); + x.UseRabbitMQ(o => + { + o.HostName = rabbitSettings.Host; + o.Port = rabbitSettings.Port; + o.VirtualHost = rabbitSettings.VirtualHost; + o.Password = rabbitSettings.Secret; + o.UserName = rabbitSettings.UserName; + o.ExchangeName = rabbitSettings.ExchangeName; + o.BasicQosOptions = new(1, true); }); + }); return services; } diff --git a/src/Bss.Platform.Events/EventTypeProvider.cs b/src/Bss.Platform.Events/EventTypeProvider.cs new file mode 100644 index 0000000..41df560 --- /dev/null +++ b/src/Bss.Platform.Events/EventTypeProvider.cs @@ -0,0 +1,58 @@ +using System.Reflection; + +using Bss.Platform.Events.Interfaces; + +namespace Bss.Platform.Events; + +internal class EventTypeProvider : IEventTypeProvider, IIntegrationEventSetup +{ + public IReadOnlyDictionary InternalEvents => this.internalTypes; + + public IReadOnlyDictionary ExternalEvents => this.externalTypes; + + private readonly Dictionary internalTypes; + private readonly Dictionary externalTypes = new(); + + public EventTypeProvider(Assembly defaultAssembly) + { + this.internalTypes = + defaultAssembly + .DefinedTypes + .Where(IsAssignableAndSatisfyCondition) + .ToDictionary(Type (x) => x, x => x.Name); + } + + public IIntegrationEventSetup AddInternalEvent(string routingKey) where TImpl : T + { + var type = typeof(TImpl); + this.internalTypes[type] = routingKey; + return this; + } + + public IIntegrationEventSetup AddExternalEvents(string prefix, params Assembly[] assemblies) + { + var newTypes = assemblies.SelectMany(x => x.DefinedTypes) + .Where(IsAssignableAndSatisfyCondition) + .Except(this.externalTypes.Keys); + + foreach (var newType in newTypes) + { + this.externalTypes[newType] = $"{prefix}{newType.Name}"; + } + + return this; + } + + public IIntegrationEventSetup AddExternalEvent(string routingKey) + where TExternalConcrete : class + { + var type = typeof(TExternalConcrete); + this.externalTypes[type] = routingKey; + return this; + } + + private static bool IsAssignableAndSatisfyCondition(TypeInfo typeInfo) => + typeInfo is { IsInterface: false, IsAbstract: false, IsNested: false } + && typeof(TAssignableTo).IsAssignableFrom(typeInfo) + && !typeInfo.Name.Contains('<'); +} diff --git a/src/Bss.Platform.Events/Interfaces/IEventTypeProvider.cs b/src/Bss.Platform.Events/Interfaces/IEventTypeProvider.cs new file mode 100644 index 0000000..d6cc176 --- /dev/null +++ b/src/Bss.Platform.Events/Interfaces/IEventTypeProvider.cs @@ -0,0 +1,8 @@ +namespace Bss.Platform.Events.Interfaces; + +public interface IEventTypeProvider +{ + IReadOnlyDictionary InternalEvents { get; } + + IReadOnlyDictionary ExternalEvents { get; } +} diff --git a/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs b/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs index 25af453..8ff2064 100644 --- a/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs +++ b/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs @@ -2,7 +2,9 @@ namespace Bss.Platform.Events.Interfaces; -public interface IIntegrationEventProcessor +public interface IIntegrationEventProcessor : IIntegrationEventProcessor; + +public interface IIntegrationEventProcessor { - Task ProcessAsync(IIntegrationEvent @event, CancellationToken token); + Task ProcessAsync(T @event, CancellationToken token); } diff --git a/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs b/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs new file mode 100644 index 0000000..4909128 --- /dev/null +++ b/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs @@ -0,0 +1,27 @@ +using System.Reflection; + +namespace Bss.Platform.Events.Interfaces; + +public interface IIntegrationEventSetup +{ + /// + /// Add a single internal event with the routing key, overrides if it already exists (added by default) + /// + IIntegrationEventSetup AddInternalEvent(string routingKey) where TImpl : T; + + /// + /// Add multiple events implemented or inherited TExternalBase with the prefix + /// + /// + /// prefix to add before type name + /// <TExternalBase>("SYS.") -> SYS.TExternal + /// + /// assemblies to find types, if not passed - will be used assembly contains TExternalBase + IIntegrationEventSetup AddExternalEvents(string prefix, params Assembly[] assemblies); + + /// + /// Add a single event with the routing key, overrides if it already exists (added by + /// ) + /// + IIntegrationEventSetup AddExternalEvent(string routingKey) where TExternalConcrete : class; +} diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs index 5f1a5fa..1d2aad4 100644 --- a/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs +++ b/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs @@ -15,4 +15,6 @@ public class IntegrationEventsMessageQueueOptions public string VirtualHost { get; set; } = default!; public string ExchangeName { get; set; } = default!; + + public string QueueName { get; set; } = default!; } diff --git a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs index 1d0273c..8893ee7 100644 --- a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs +++ b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs @@ -1,19 +1,56 @@ using Bss.Platform.Events.Abstractions; +using Bss.Platform.Events.Interfaces; using DotNetCore.CAP; namespace Bss.Platform.Events.Publishers; -public class IntegrationEventPublisher(ICapPublisher capPublisher, ICapTransaction capTransaction) : IIntegrationEventPublisher +public class IntegrationEventPublisherLegacy(ICapPublisher capPublisher, ICapTransaction capTransaction) + : IntegrationEventPublisherBase(capPublisher, capTransaction) { - public Task PublishAsync(IIntegrationEvent @event, CancellationToken cancellationToken) + private readonly ICapPublisher capPublisher = capPublisher; + + protected override Task PublishInternalAsync(object @event, CancellationToken cancellationToken) => + this.capPublisher.PublishAsync(@event.GetType().Name, @event, cancellationToken: cancellationToken); +} + +public class IntegrationEventPublisherNew(ICapPublisher capPublisher, ICapTransaction capTransaction, IEventTypeProvider eventTypeProvider) + : IntegrationEventPublisherBase(capPublisher, capTransaction) +{ + private readonly ICapPublisher capPublisher = capPublisher; + + protected override async Task PublishInternalAsync(object @event, CancellationToken cancellationToken) + { + if (eventTypeProvider.InternalEvents.TryGetValue(@event.GetType(), out var internalRoutingKey)) + { + await this.capPublisher.PublishAsync(internalRoutingKey, @event, cancellationToken: cancellationToken); + } + + if (eventTypeProvider.ExternalEvents.TryGetValue(@event.GetType(), out var externalRoutingKey) + && externalRoutingKey != internalRoutingKey) + { + await this.capPublisher.PublishAsync(externalRoutingKey, @event, cancellationToken: cancellationToken); + } + + if (internalRoutingKey == null && externalRoutingKey == null) + { + throw new($"No routing key found for event type {@event.GetType().FullName}"); + } + } +} + +public abstract class IntegrationEventPublisherBase(ICapPublisher capPublisher, ICapTransaction capTransaction) : IIntegrationEventPublisher +{ + public Task PublishAsync(object @event, CancellationToken cancellationToken) { if (capPublisher.Transaction is not null && capPublisher.Transaction != capTransaction) { - throw new Exception("There cannot be different CAP transactions within the same scope"); + throw new("There cannot be different CAP transactions within the same scope"); } capPublisher.Transaction = capTransaction; - return capPublisher.PublishAsync(@event.GetType().Name, @event, cancellationToken: cancellationToken); + return this.PublishInternalAsync(@event, cancellationToken); } + + protected abstract Task PublishInternalAsync(object @event, CancellationToken cancellationToken); } diff --git a/src/Directory.Build.props b/src/Directory.Build.props index c854181..9067f20 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -1,6 +1,7 @@ - net9.0 + net9.0 + 14 https://github.com/Luxoft/bss-platform git diff --git a/src/__SolutionItems/CommonAssemblyInfo.cs b/src/__SolutionItems/CommonAssemblyInfo.cs index cf6130e..0fcc942 100644 --- a/src/__SolutionItems/CommonAssemblyInfo.cs +++ b/src/__SolutionItems/CommonAssemblyInfo.cs @@ -4,9 +4,9 @@ [assembly: AssemblyCompany("Luxoft")] [assembly: AssemblyCopyright("Copyright © Luxoft 2026")] -[assembly: AssemblyVersion("1.6.8.0")] -[assembly: AssemblyFileVersion("1.6.8.0")] -[assembly: AssemblyInformationalVersion("1.6.8.0")] +[assembly: AssemblyVersion("1.6.9")] +[assembly: AssemblyFileVersion("1.6.9")] +[assembly: AssemblyInformationalVersion("1.6.9")] #if DEBUG [assembly: AssemblyConfiguration("Debug")] From 288ba9bfec1e295089094ab6b7acb67120e084a7 Mon Sep 17 00:00:00 2001 From: Ivan Gubanov Date: Mon, 1 Jun 2026 00:43:19 +0300 Subject: [PATCH 2/5] updated setup method, exluded auto registration internal events, added new method + generic base event publisher instead of object --- .../IEventTypeProvider.cs | 14 +++++ .../IIntegrationEventPublisher.cs | 6 ++- .../DependencyInjection.cs | 25 +++++---- src/Bss.Platform.Events/EventTypeProvider.cs | 54 +++++++++++++------ .../Interfaces/IEventTypeProvider.cs | 8 --- .../Interfaces/IIntegrationEventSetup.cs | 20 +++++-- .../Publishers/IntegrationEventPublisher.cs | 18 ++++--- 7 files changed, 96 insertions(+), 49 deletions(-) create mode 100644 src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs delete mode 100644 src/Bss.Platform.Events/Interfaces/IEventTypeProvider.cs diff --git a/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs b/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs new file mode 100644 index 0000000..403d29a --- /dev/null +++ b/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs @@ -0,0 +1,14 @@ +namespace Bss.Platform.Events.Abstractions; + +public interface IEventTypeProvider +{ + /// + /// Internal events are sent to the Rabbit->CAP queue and handled within the target system + /// + IReadOnlyDictionary InternalEvents { get; } + + /// + /// External events are sent to the Rabbit exchange, but do not have a handler in the target system + /// + IReadOnlyDictionary ExternalEvents { get; } +} diff --git a/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs b/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs index 87f020c..d95f2ad 100644 --- a/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs +++ b/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs @@ -1,6 +1,8 @@ namespace Bss.Platform.Events.Abstractions; -public interface IIntegrationEventPublisher +public interface IIntegrationEventPublisher { - Task PublishAsync(object @event, CancellationToken cancellationToken); + Task PublishAsync(T @event, CancellationToken cancellationToken); } + +public interface IIntegrationEventPublisher : IIntegrationEventPublisher; diff --git a/src/Bss.Platform.Events/DependencyInjection.cs b/src/Bss.Platform.Events/DependencyInjection.cs index d81d675..4ee44ec 100644 --- a/src/Bss.Platform.Events/DependencyInjection.cs +++ b/src/Bss.Platform.Events/DependencyInjection.cs @@ -38,19 +38,24 @@ public static IServiceCollection AddPlatformIntegrationEvents( return services; } - public static IServiceCollection AddPlatformIntegrationEvents( + /// + /// A new way to register integration events, required to set up internal and external events + /// + /// Automatically registered IIntegrationEventPublisher<TEvent> wrap it if you need + public static IServiceCollection AddPlatformIntegrationEvents( this IServiceCollection services, - Action? setupOptions = null, - Action>? setupEvents = null) - where TEventProcessor : class, IIntegrationEventProcessor + Action> setupEvents, + Action? setupOptions = null) + where TEventProcessor : class, IIntegrationEventProcessor + where TEvent : notnull { - var typeProvider = new EventTypeProvider(typeof(TInternalEvent).Assembly); - setupEvents?.Invoke(typeProvider); + var typeProvider = new EventTypeProvider(); + setupEvents.Invoke(typeProvider); services .AddSingleton(typeProvider) .AddSingleton() - .AddScoped() + .AddScoped, IntegrationEventPublisherNew>() .AddPlatformIntegrationEventsInternal(setupOptions) .Configure((RabbitMQOptions opt) => { @@ -59,16 +64,16 @@ public static IServiceCollection AddPlatformIntegrationEvents().NextId().ToString()), new(Headers.MessageName, msg.RoutingKey), - new(Headers.Type, typeof(TInternalEvent).Name) + new(Headers.Type, typeof(TEvent).Name) ]; }); - services.AddSingleton, TEventProcessor>(); + services.AddSingleton, TEventProcessor>(); // NOTE: register TEventProcessor for each type (required for CapConsumerExecutor) typeProvider.InternalEvents.Keys .Select(t => typeof(IIntegrationEventProcessor<>).MakeGenericType(t)) .ToList() - .ForEach(x => services.AddSingleton(x, sp => sp.GetRequiredService>())); + .ForEach(x => services.AddSingleton(x, sp => sp.GetRequiredService>())); return services; } diff --git a/src/Bss.Platform.Events/EventTypeProvider.cs b/src/Bss.Platform.Events/EventTypeProvider.cs index 41df560..dd89eea 100644 --- a/src/Bss.Platform.Events/EventTypeProvider.cs +++ b/src/Bss.Platform.Events/EventTypeProvider.cs @@ -1,5 +1,6 @@ using System.Reflection; +using Bss.Platform.Events.Abstractions; using Bss.Platform.Events.Interfaces; namespace Bss.Platform.Events; @@ -7,32 +8,41 @@ namespace Bss.Platform.Events; internal class EventTypeProvider : IEventTypeProvider, IIntegrationEventSetup { public IReadOnlyDictionary InternalEvents => this.internalTypes; - public IReadOnlyDictionary ExternalEvents => this.externalTypes; - private readonly Dictionary internalTypes; - private readonly Dictionary externalTypes = new(); + private readonly Dictionary internalTypes = []; + private readonly Dictionary externalTypes = []; - public EventTypeProvider(Assembly defaultAssembly) + public IIntegrationEventSetup AddInternalEvents(string prefix = "", params Assembly[] assemblies) + where TEvent : T { - this.internalTypes = - defaultAssembly - .DefinedTypes - .Where(IsAssignableAndSatisfyCondition) - .ToDictionary(Type (x) => x, x => x.Name); + var newTypes = GetOrDefaultAssembly(assemblies) + .SelectMany(x => x.DefinedTypes) + .Where(IsAssignableAndSatisfyCondition) + .Except(this.externalTypes.Keys); + + foreach (var newType in newTypes) + { + this.internalTypes[newType] = $"{prefix}{newType.Name}"; + } + + return this; } - public IIntegrationEventSetup AddInternalEvent(string routingKey) where TImpl : T + public IIntegrationEventSetup AddInternalEvent(string routingKey) + where TEvent : class, T { - var type = typeof(TImpl); + var type = typeof(TEvent); this.internalTypes[type] = routingKey; return this; } - public IIntegrationEventSetup AddExternalEvents(string prefix, params Assembly[] assemblies) + public IIntegrationEventSetup AddExternalEvents(string prefix = "", params Assembly[] assemblies) + where TEvent : T { - var newTypes = assemblies.SelectMany(x => x.DefinedTypes) - .Where(IsAssignableAndSatisfyCondition) + var newTypes = GetOrDefaultAssembly(assemblies) + .SelectMany(x => x.DefinedTypes) + .Where(IsAssignableAndSatisfyCondition) .Except(this.externalTypes.Keys); foreach (var newType in newTypes) @@ -43,14 +53,24 @@ public IIntegrationEventSetup AddExternalEvents(string prefix, return this; } - public IIntegrationEventSetup AddExternalEvent(string routingKey) - where TExternalConcrete : class + public IIntegrationEventSetup AddExternalEvent(string routingKey) + where TEvent : class, T { - var type = typeof(TExternalConcrete); + var type = typeof(TEvent); this.externalTypes[type] = routingKey; return this; } + private static Assembly[] GetOrDefaultAssembly(Assembly[] assemblies) where TEvent : T + { + if (assemblies.Length == 0) + { + assemblies = [typeof(TEvent).Assembly]; + } + + return assemblies; + } + private static bool IsAssignableAndSatisfyCondition(TypeInfo typeInfo) => typeInfo is { IsInterface: false, IsAbstract: false, IsNested: false } && typeof(TAssignableTo).IsAssignableFrom(typeInfo) diff --git a/src/Bss.Platform.Events/Interfaces/IEventTypeProvider.cs b/src/Bss.Platform.Events/Interfaces/IEventTypeProvider.cs deleted file mode 100644 index d6cc176..0000000 --- a/src/Bss.Platform.Events/Interfaces/IEventTypeProvider.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Bss.Platform.Events.Interfaces; - -public interface IEventTypeProvider -{ - IReadOnlyDictionary InternalEvents { get; } - - IReadOnlyDictionary ExternalEvents { get; } -} diff --git a/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs b/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs index 4909128..003958b 100644 --- a/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs +++ b/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs @@ -5,9 +5,21 @@ namespace Bss.Platform.Events.Interfaces; public interface IIntegrationEventSetup { /// - /// Add a single internal event with the routing key, overrides if it already exists (added by default) + /// Add multiple events implemented or inherited TInternalBase with the prefix /// - IIntegrationEventSetup AddInternalEvent(string routingKey) where TImpl : T; + /// + /// prefix to add before type name + /// <TInternalBase>("INT.") -> INT.TInternal + /// + /// assemblies to find types, if not passed - will be used assembly contains TInternalBase + IIntegrationEventSetup AddInternalEvents(string prefix = "", params Assembly[] assemblies) + where TInternalBase : T; + + /// + /// Add a single event with the routing key, overrides if it already exists (added by + /// ) + /// + IIntegrationEventSetup AddInternalEvent(string routingKey) where TEvent : class, T; /// /// Add multiple events implemented or inherited TExternalBase with the prefix @@ -17,11 +29,11 @@ public interface IIntegrationEventSetup /// <TExternalBase>("SYS.") -> SYS.TExternal /// /// assemblies to find types, if not passed - will be used assembly contains TExternalBase - IIntegrationEventSetup AddExternalEvents(string prefix, params Assembly[] assemblies); + IIntegrationEventSetup AddExternalEvents(string prefix, params Assembly[] assemblies) where TExternalBase : T; /// /// Add a single event with the routing key, overrides if it already exists (added by /// ) /// - IIntegrationEventSetup AddExternalEvent(string routingKey) where TExternalConcrete : class; + IIntegrationEventSetup AddExternalEvent(string routingKey) where TEvent : class, T; } diff --git a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs index 8893ee7..4e545d7 100644 --- a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs +++ b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs @@ -6,20 +6,21 @@ namespace Bss.Platform.Events.Publishers; public class IntegrationEventPublisherLegacy(ICapPublisher capPublisher, ICapTransaction capTransaction) - : IntegrationEventPublisherBase(capPublisher, capTransaction) + : IntegrationEventPublisherBase(capPublisher, capTransaction), IIntegrationEventPublisher { private readonly ICapPublisher capPublisher = capPublisher; - protected override Task PublishInternalAsync(object @event, CancellationToken cancellationToken) => + protected override Task PublishInternalAsync(IIntegrationEvent @event, CancellationToken cancellationToken) => this.capPublisher.PublishAsync(@event.GetType().Name, @event, cancellationToken: cancellationToken); } -public class IntegrationEventPublisherNew(ICapPublisher capPublisher, ICapTransaction capTransaction, IEventTypeProvider eventTypeProvider) - : IntegrationEventPublisherBase(capPublisher, capTransaction) +public class IntegrationEventPublisherNew(ICapPublisher capPublisher, ICapTransaction capTransaction, IEventTypeProvider eventTypeProvider) + : IntegrationEventPublisherBase(capPublisher, capTransaction) + where T: notnull { private readonly ICapPublisher capPublisher = capPublisher; - protected override async Task PublishInternalAsync(object @event, CancellationToken cancellationToken) + protected override async Task PublishInternalAsync(T @event, CancellationToken cancellationToken) { if (eventTypeProvider.InternalEvents.TryGetValue(@event.GetType(), out var internalRoutingKey)) { @@ -39,9 +40,10 @@ protected override async Task PublishInternalAsync(object @event, CancellationTo } } -public abstract class IntegrationEventPublisherBase(ICapPublisher capPublisher, ICapTransaction capTransaction) : IIntegrationEventPublisher +public abstract class IntegrationEventPublisherBase(ICapPublisher capPublisher, ICapTransaction capTransaction) + : IIntegrationEventPublisher { - public Task PublishAsync(object @event, CancellationToken cancellationToken) + public Task PublishAsync(T @event, CancellationToken cancellationToken) { if (capPublisher.Transaction is not null && capPublisher.Transaction != capTransaction) { @@ -52,5 +54,5 @@ public Task PublishAsync(object @event, CancellationToken cancellationToken) return this.PublishInternalAsync(@event, cancellationToken); } - protected abstract Task PublishInternalAsync(object @event, CancellationToken cancellationToken); + protected abstract Task PublishInternalAsync(T @event, CancellationToken cancellationToken); } From 47a7c9c69cf3454eb652ddb9465641a60e4ed1c9 Mon Sep 17 00:00:00 2001 From: Ivan Gubanov Date: Fri, 5 Jun 2026 00:51:50 +0300 Subject: [PATCH 3/5] moved internal services, extend options, rename builder method --- .../IEventTypeProvider.cs | 4 +- .../DependencyInjection.cs | 92 +++++++++++++------ src/Bss.Platform.Events/EventTypeProvider.cs | 40 ++++---- .../Interfaces/IIntegrationEventSetup.cs | 16 ++-- .../{ => Internal}/CapConsumerExecutor.cs | 2 +- .../CapConsumerServiceSelector.cs | 11 +-- .../IntegrationEventsMessageQueueOptions.cs | 2 +- .../Models/IntegrationEventsOptions.cs | 31 +++---- .../IntegrationEventsSqlServerOptions.cs | 3 + .../Publishers/IntegrationEventPublisher.cs | 4 +- 10 files changed, 119 insertions(+), 86 deletions(-) rename src/Bss.Platform.Events/{ => Internal}/CapConsumerExecutor.cs (87%) rename src/Bss.Platform.Events/{ => Internal}/CapConsumerServiceSelector.cs (93%) diff --git a/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs b/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs index 403d29a..5a6917b 100644 --- a/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs +++ b/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs @@ -5,10 +5,10 @@ public interface IEventTypeProvider /// /// Internal events are sent to the Rabbit->CAP queue and handled within the target system /// - IReadOnlyDictionary InternalEvents { get; } + IReadOnlyDictionary InputEvents { get; } /// /// External events are sent to the Rabbit exchange, but do not have a handler in the target system /// - IReadOnlyDictionary ExternalEvents { get; } + IReadOnlyDictionary OutputEvents { get; } } diff --git a/src/Bss.Platform.Events/DependencyInjection.cs b/src/Bss.Platform.Events/DependencyInjection.cs index 4ee44ec..7597cee 100644 --- a/src/Bss.Platform.Events/DependencyInjection.cs +++ b/src/Bss.Platform.Events/DependencyInjection.cs @@ -3,6 +3,7 @@ using Bss.Platform.Events.Abstractions; using Bss.Platform.Events.Interfaces; +using Bss.Platform.Events.Internal; using Bss.Platform.Events.Models; using Bss.Platform.Events.Publishers; @@ -38,24 +39,48 @@ public static IServiceCollection AddPlatformIntegrationEvents( return services; } + /// + /// A new way to register integration events, required to set up internal and external events
+ /// Used Bss.Platform.Mediation + ///
+ /// Automatically registered IIntegrationEventPublisher<TEvent> wrap it if you need + public static IServiceCollection AddPlatformIntegrationEvents( + this IServiceCollection services, + Action> setupEvents, + Action setupOptions) + where TEventProcessor : class, IIntegrationEventProcessor => + services.AddPlatformIntegrationEvents(setupEvents, setupOptions); + /// /// A new way to register integration events, required to set up internal and external events /// /// Automatically registered IIntegrationEventPublisher<TEvent> wrap it if you need public static IServiceCollection AddPlatformIntegrationEvents( this IServiceCollection services, - Action> setupEvents, - Action? setupOptions = null) + Action> setupEvents, + Action setupOptions) where TEventProcessor : class, IIntegrationEventProcessor - where TEvent : notnull + where TEvent : notnull => + services.AddPlatformIntegrationEvents(setupEvents, setupOptions); + + /// + /// A new way to register integration events, required to set up internal and external events + /// + /// Automatically registered IIntegrationEventPublisher<TEvent> wrap it if you need + public static IServiceCollection AddPlatformIntegrationEvents( + this IServiceCollection services, + Action> setupEvents, + Action setupOptions) + where TEventProcessor : class, IIntegrationEventProcessor + where TInputEvent : notnull { - var typeProvider = new EventTypeProvider(); + var typeProvider = new EventTypeProvider(); setupEvents.Invoke(typeProvider); services .AddSingleton(typeProvider) .AddSingleton() - .AddScoped, IntegrationEventPublisherNew>() + .AddScoped, IntegrationEventPublisherNew>() .AddPlatformIntegrationEventsInternal(setupOptions) .Configure((RabbitMQOptions opt) => { @@ -64,16 +89,16 @@ public static IServiceCollection AddPlatformIntegrationEvents().NextId().ToString()), new(Headers.MessageName, msg.RoutingKey), - new(Headers.Type, typeof(TEvent).Name) + new(Headers.Type, typeof(TInputEvent).Name) ]; }); - services.AddSingleton, TEventProcessor>(); + services.AddSingleton, TEventProcessor>(); // NOTE: register TEventProcessor for each type (required for CapConsumerExecutor) - typeProvider.InternalEvents.Keys + typeProvider.InputEvents.Keys .Select(t => typeof(IIntegrationEventProcessor<>).MakeGenericType(t)) .ToList() - .ForEach(x => services.AddSingleton(x, sp => sp.GetRequiredService>())); + .ForEach(x => services.AddSingleton(x, sp => sp.GetRequiredService>())); return services; } @@ -91,21 +116,25 @@ private static IServiceCollection AddPlatformIntegrationEventsInternal( }) .AddCap(x => { - var eventsOptions = IntegrationEventsOptions.Default; + var eventsOptions = new IntegrationEventsOptions(); setupEventOptions?.Invoke(eventsOptions); x.FailedRetryCount = eventsOptions.FailedRetryCount; x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds; - x.UseSqlServer(o => + if (!string.IsNullOrEmpty(eventsOptions.SqlServer.ConnectionString)) { - o.ConnectionString = eventsOptions.SqlServer.ConnectionString; - o.Schema = eventsOptions.SqlServer.Schema; - }); + x.UseSqlServer(o => + { + o.ConnectionString = eventsOptions.SqlServer.ConnectionString; + o.Schema = eventsOptions.SqlServer.Schema; + }); + } x.UseDashboard(o => { o.PathMatch = eventsOptions.DashboardPath; + o.PathBase = eventsOptions.GatewayPrefix; if (eventsOptions.AuthorizationPredicate is { } authPredicate) { o.AllowAnonymousExplicit = false; @@ -114,26 +143,29 @@ private static IServiceCollection AddPlatformIntegrationEventsInternal( }); var rabbitSettings = eventsOptions.MessageQueue; - if (!rabbitSettings.Enable) + if (rabbitSettings.Enable) + { + x.DefaultGroupName = string.IsNullOrWhiteSpace(rabbitSettings.QueueName) + ? rabbitSettings.ExchangeName + : rabbitSettings.QueueName; + + x.UseRabbitMQ(o => + { + o.HostName = rabbitSettings.Host; + o.Port = rabbitSettings.Port; + o.VirtualHost = rabbitSettings.VirtualHost; + o.Password = rabbitSettings.Secret; + o.UserName = rabbitSettings.UserName; + o.ExchangeName = rabbitSettings.ExchangeName; + o.BasicQosOptions = new(1, true); + }); + } + else { x.UseInMemoryMessageQueue(); - return; } - x.DefaultGroupName = string.IsNullOrWhiteSpace(rabbitSettings.QueueName) - ? rabbitSettings.ExchangeName - : rabbitSettings.QueueName; - - x.UseRabbitMQ(o => - { - o.HostName = rabbitSettings.Host; - o.Port = rabbitSettings.Port; - o.VirtualHost = rabbitSettings.VirtualHost; - o.Password = rabbitSettings.Secret; - o.UserName = rabbitSettings.UserName; - o.ExchangeName = rabbitSettings.ExchangeName; - o.BasicQosOptions = new(1, true); - }); + eventsOptions.OverrideCapOptions?.Invoke(x); }); return services; diff --git a/src/Bss.Platform.Events/EventTypeProvider.cs b/src/Bss.Platform.Events/EventTypeProvider.cs index dd89eea..c87d837 100644 --- a/src/Bss.Platform.Events/EventTypeProvider.cs +++ b/src/Bss.Platform.Events/EventTypeProvider.cs @@ -5,63 +5,63 @@ namespace Bss.Platform.Events; -internal class EventTypeProvider : IEventTypeProvider, IIntegrationEventSetup +internal class EventTypeProvider : IEventTypeProvider, IIntegrationEventSetup { - public IReadOnlyDictionary InternalEvents => this.internalTypes; - public IReadOnlyDictionary ExternalEvents => this.externalTypes; + public IReadOnlyDictionary InputEvents => this.inputTypes; + public IReadOnlyDictionary OutputEvents => this.outputTypes; - private readonly Dictionary internalTypes = []; - private readonly Dictionary externalTypes = []; + private readonly Dictionary inputTypes = []; + private readonly Dictionary outputTypes = []; - public IIntegrationEventSetup AddInternalEvents(string prefix = "", params Assembly[] assemblies) - where TEvent : T + public IIntegrationEventSetup AddInputEvents(string prefix = "", params Assembly[] assemblies) + where TEvent : TIn { var newTypes = GetOrDefaultAssembly(assemblies) .SelectMany(x => x.DefinedTypes) .Where(IsAssignableAndSatisfyCondition) - .Except(this.externalTypes.Keys); + .Except(this.outputTypes.Keys); foreach (var newType in newTypes) { - this.internalTypes[newType] = $"{prefix}{newType.Name}"; + this.inputTypes[newType] = $"{prefix}{newType.Name}"; } return this; } - public IIntegrationEventSetup AddInternalEvent(string routingKey) - where TEvent : class, T + public IIntegrationEventSetup AddInputEvent(string routingKey) + where TEvent : class, TIn { var type = typeof(TEvent); - this.internalTypes[type] = routingKey; + this.inputTypes[type] = routingKey; return this; } - public IIntegrationEventSetup AddExternalEvents(string prefix = "", params Assembly[] assemblies) - where TEvent : T + public IIntegrationEventSetup AddOutputEvents(string prefix = "", params Assembly[] assemblies) + where TEvent : TOut { var newTypes = GetOrDefaultAssembly(assemblies) .SelectMany(x => x.DefinedTypes) .Where(IsAssignableAndSatisfyCondition) - .Except(this.externalTypes.Keys); + .Except(this.outputTypes.Keys); foreach (var newType in newTypes) { - this.externalTypes[newType] = $"{prefix}{newType.Name}"; + this.outputTypes[newType] = $"{prefix}{newType.Name}"; } return this; } - public IIntegrationEventSetup AddExternalEvent(string routingKey) - where TEvent : class, T + public IIntegrationEventSetup AddOutputEvent(string routingKey) + where TEvent : class, TOut { var type = typeof(TEvent); - this.externalTypes[type] = routingKey; + this.outputTypes[type] = routingKey; return this; } - private static Assembly[] GetOrDefaultAssembly(Assembly[] assemblies) where TEvent : T + private static Assembly[] GetOrDefaultAssembly(Assembly[] assemblies) { if (assemblies.Length == 0) { diff --git a/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs b/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs index 003958b..31de0a8 100644 --- a/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs +++ b/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs @@ -2,7 +2,7 @@ namespace Bss.Platform.Events.Interfaces; -public interface IIntegrationEventSetup +public interface IIntegrationEventSetup { /// /// Add multiple events implemented or inherited TInternalBase with the prefix @@ -12,14 +12,14 @@ public interface IIntegrationEventSetup /// <TInternalBase>("INT.") -> INT.TInternal /// /// assemblies to find types, if not passed - will be used assembly contains TInternalBase - IIntegrationEventSetup AddInternalEvents(string prefix = "", params Assembly[] assemblies) - where TInternalBase : T; + IIntegrationEventSetup AddInputEvents(string prefix = "", params Assembly[] assemblies) + where TInputBase : TIn; /// /// Add a single event with the routing key, overrides if it already exists (added by - /// ) + /// ) /// - IIntegrationEventSetup AddInternalEvent(string routingKey) where TEvent : class, T; + IIntegrationEventSetup AddInputEvent(string routingKey) where TInput : class, TIn; /// /// Add multiple events implemented or inherited TExternalBase with the prefix @@ -29,11 +29,11 @@ IIntegrationEventSetup AddInternalEvents(string prefix = "", p /// <TExternalBase>("SYS.") -> SYS.TExternal /// /// assemblies to find types, if not passed - will be used assembly contains TExternalBase - IIntegrationEventSetup AddExternalEvents(string prefix, params Assembly[] assemblies) where TExternalBase : T; + IIntegrationEventSetup AddOutputEvents(string prefix, params Assembly[] assemblies) where TOutputBase : TOut; /// /// Add a single event with the routing key, overrides if it already exists (added by - /// ) + /// + /// required to fill connection string in options, otherwise MS SQL db won't connect + /// + public IntegrationEventsSqlServerOptions SqlServer { get; set; } = new() { Schema = "events" }; + + public IntegrationEventsMessageQueueOptions MessageQueue { get; set; } = new() { Enable = true }; + + public Action? OverrideCapOptions { get; set; } - public IntegrationEventsMessageQueueOptions MessageQueue { get; set; } = default!; - /// /// Any condition to check that a user should get access to events dashboard /// @@ -21,14 +30,4 @@ public class IntegrationEventsOptions /// AuthorizationPolicyPredicate = (httpContext) => httpContext.RequestServices.GetRequiredService<ICurrentUser>().IsAdminAsync() /// public Func>? AuthorizationPredicate { get; set; } - - public static IntegrationEventsOptions Default => - new() - { - DashboardPath = "/admin/events", - FailedRetryCount = 5, - RetentionDays = 15, - SqlServer = new IntegrationEventsSqlServerOptions { Schema = "events" }, - MessageQueue = new IntegrationEventsMessageQueueOptions { Enable = true } - }; } diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs index f38a179..8825cf3 100644 --- a/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs +++ b/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs @@ -2,6 +2,9 @@ namespace Bss.Platform.Events.Models; public class IntegrationEventsSqlServerOptions { + /// + /// When empty - MS SQL db won't connect + /// public string ConnectionString { get; set; } = default!; public string Schema { get; set; } = default!; diff --git a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs index 4e545d7..95f60bb 100644 --- a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs +++ b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs @@ -22,12 +22,12 @@ public class IntegrationEventPublisherNew(ICapPublisher capPublisher, ICapTra protected override async Task PublishInternalAsync(T @event, CancellationToken cancellationToken) { - if (eventTypeProvider.InternalEvents.TryGetValue(@event.GetType(), out var internalRoutingKey)) + if (eventTypeProvider.InputEvents.TryGetValue(@event.GetType(), out var internalRoutingKey)) { await this.capPublisher.PublishAsync(internalRoutingKey, @event, cancellationToken: cancellationToken); } - if (eventTypeProvider.ExternalEvents.TryGetValue(@event.GetType(), out var externalRoutingKey) + if (eventTypeProvider.OutputEvents.TryGetValue(@event.GetType(), out var externalRoutingKey) && externalRoutingKey != internalRoutingKey) { await this.capPublisher.PublishAsync(externalRoutingKey, @event, cancellationToken: cancellationToken); From 4f6c5a0622391ea88cef3a1dafe5b6e6cc2705ae Mon Sep 17 00:00:00 2001 From: Ivan Gubanov Date: Fri, 5 Jun 2026 01:24:19 +0300 Subject: [PATCH 4/5] added CAP Filter to handle failed events --- .../IFailedEventProcessor.cs | 6 +++ .../DependencyInjection.cs | 9 +++- .../Internal/CapExceptionFilter.cs | 44 +++++++++++++++++++ .../Models/IntegrationEventsOptions.cs | 6 +++ 4 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 src/Bss.Platform.Events.Abstractions/IFailedEventProcessor.cs create mode 100644 src/Bss.Platform.Events/Internal/CapExceptionFilter.cs diff --git a/src/Bss.Platform.Events.Abstractions/IFailedEventProcessor.cs b/src/Bss.Platform.Events.Abstractions/IFailedEventProcessor.cs new file mode 100644 index 0000000..01ea889 --- /dev/null +++ b/src/Bss.Platform.Events.Abstractions/IFailedEventProcessor.cs @@ -0,0 +1,6 @@ +namespace Bss.Platform.Events.Abstractions; + +public interface IFailedEventProcessor +{ + Task HandleAsync(object? value, Exception ex); +} diff --git a/src/Bss.Platform.Events/DependencyInjection.cs b/src/Bss.Platform.Events/DependencyInjection.cs index 7597cee..281618c 100644 --- a/src/Bss.Platform.Events/DependencyInjection.cs +++ b/src/Bss.Platform.Events/DependencyInjection.cs @@ -8,6 +8,7 @@ using Bss.Platform.Events.Publishers; using DotNetCore.CAP; +using DotNetCore.CAP.Filter; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; @@ -73,6 +74,7 @@ public static IServiceCollection AddPlatformIntegrationEvents setupOptions) where TEventProcessor : class, IIntegrationEventProcessor where TInputEvent : notnull + where TOutputEvent : notnull { var typeProvider = new EventTypeProvider(); setupEvents.Invoke(typeProvider); @@ -80,7 +82,7 @@ public static IServiceCollection AddPlatformIntegrationEvents(typeProvider) .AddSingleton() - .AddScoped, IntegrationEventPublisherNew>() + .AddScoped, IntegrationEventPublisherNew>() .AddPlatformIntegrationEventsInternal(setupOptions) .Configure((RabbitMQOptions opt) => { @@ -122,6 +124,11 @@ private static IServiceCollection AddPlatformIntegrationEventsInternal( x.FailedRetryCount = eventsOptions.FailedRetryCount; x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds; + if (eventsOptions.UseFailedEventProcessor) + { + services.AddScoped(); + } + if (!string.IsNullOrEmpty(eventsOptions.SqlServer.ConnectionString)) { x.UseSqlServer(o => diff --git a/src/Bss.Platform.Events/Internal/CapExceptionFilter.cs b/src/Bss.Platform.Events/Internal/CapExceptionFilter.cs new file mode 100644 index 0000000..cf6d19c --- /dev/null +++ b/src/Bss.Platform.Events/Internal/CapExceptionFilter.cs @@ -0,0 +1,44 @@ +using System.Text.Json; + +using Bss.Platform.Events.Abstractions; + +using DotNetCore.CAP; +using DotNetCore.CAP.Filter; +using DotNetCore.CAP.Serialization; + +using Microsoft.Extensions.Options; + +namespace Bss.Platform.Events.Internal; + +internal sealed class CapExceptionFilter(IOptions capOptions, IEnumerable failsProcessors, ISerializer serializer) : SubscribeFilter +{ + private const string DetailsHeader = "x-cap-failure-details"; + + // NOTE: -1 value because the CAP incremented after filters + private int LatestRetryCount => capOptions.Value.FailedRetryCount - 1; + + public override Task OnSubscribeExceptionAsync(ExceptionContext context) + { + if (context.MediumMessage.Retries != this.LatestRetryCount) + { + return Task.CompletedTask; + } + + var ex = context.Exception; + var errorText = ex.GetBaseException().Message; + + var details = new CapFailureDetails(ex.GetType().FullName ?? ex.GetType().Name, errorText, ex.StackTrace); + context.DeliverMessage.Headers[DetailsHeader] = JsonSerializer.Serialize(details); + + var payloadParam = context.ConsumerDescriptor.Parameters.SingleOrDefault(p => !p.IsFromCap); + var value = context.DeliverMessage.Value; + + var payload = payloadParam is not null && value is not null + ? serializer.Deserialize(value, payloadParam.ParameterType) + : null; + + return Task.WhenAll(failsProcessors.Select(x => x.HandleAsync(payload, ex))); + } + + internal sealed record CapFailureDetails(string ExceptionType, string Message, string? StackTrace); +} diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs index cdd7288..8eaca0b 100644 --- a/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs +++ b/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs @@ -23,6 +23,12 @@ public class IntegrationEventsOptions public Action? OverrideCapOptions { get; set; } + /// + /// When enable will be registered CAP filter to handle failed event on the last attempt, allow using multiple + /// scoped implementations + /// + public bool UseFailedEventProcessor { get; set; } + /// /// Any condition to check that a user should get access to events dashboard /// From 501bc0d8dfa6ba437c356985f001b0c145e716b5 Mon Sep 17 00:00:00 2001 From: Ivan Gubanov Date: Fri, 5 Jun 2026 11:57:29 +0300 Subject: [PATCH 5/5] fixed (service should registered outside AddCap + handle case with 0 retries) + deserialize with condition IsJsonType (retry passed json, but the first attempt/without retries passed the target deserialized object type) --- src/Bss.Platform.Events/DependencyInjection.cs | 16 ++++++++-------- .../Internal/CapExceptionFilter.cs | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/Bss.Platform.Events/DependencyInjection.cs b/src/Bss.Platform.Events/DependencyInjection.cs index 281618c..b77d8fb 100644 --- a/src/Bss.Platform.Events/DependencyInjection.cs +++ b/src/Bss.Platform.Events/DependencyInjection.cs @@ -109,6 +109,14 @@ private static IServiceCollection AddPlatformIntegrationEventsInternal( this IServiceCollection services, Action? setupEventOptions = null) { + var eventsOptions = new IntegrationEventsOptions(); + setupEventOptions?.Invoke(eventsOptions); + + if (eventsOptions.UseFailedEventProcessor) + { + services.AddScoped(); + } + services .AddScoped(serviceProvider => { @@ -118,17 +126,9 @@ private static IServiceCollection AddPlatformIntegrationEventsInternal( }) .AddCap(x => { - var eventsOptions = new IntegrationEventsOptions(); - setupEventOptions?.Invoke(eventsOptions); - x.FailedRetryCount = eventsOptions.FailedRetryCount; x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds; - if (eventsOptions.UseFailedEventProcessor) - { - services.AddScoped(); - } - if (!string.IsNullOrEmpty(eventsOptions.SqlServer.ConnectionString)) { x.UseSqlServer(o => diff --git a/src/Bss.Platform.Events/Internal/CapExceptionFilter.cs b/src/Bss.Platform.Events/Internal/CapExceptionFilter.cs index cf6d19c..2952966 100644 --- a/src/Bss.Platform.Events/Internal/CapExceptionFilter.cs +++ b/src/Bss.Platform.Events/Internal/CapExceptionFilter.cs @@ -14,8 +14,8 @@ internal sealed class CapExceptionFilter(IOptions capOptions, IEnume { private const string DetailsHeader = "x-cap-failure-details"; - // NOTE: -1 value because the CAP incremented after filters - private int LatestRetryCount => capOptions.Value.FailedRetryCount - 1; + // NOTE: -1 value because the CAP incremented after filters and handle 0 retries case + private int LatestRetryCount => Math.Max(capOptions.Value.FailedRetryCount - 1, 0); public override Task OnSubscribeExceptionAsync(ExceptionContext context) { @@ -33,9 +33,9 @@ public override Task OnSubscribeExceptionAsync(ExceptionContext context) var payloadParam = context.ConsumerDescriptor.Parameters.SingleOrDefault(p => !p.IsFromCap); var value = context.DeliverMessage.Value; - var payload = payloadParam is not null && value is not null + var payload = payloadParam is not null && value is not null && serializer.IsJsonType(value) ? serializer.Deserialize(value, payloadParam.ParameterType) - : null; + : value; return Task.WhenAll(failsProcessors.Select(x => x.HandleAsync(payload, ex))); }