diff --git a/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs b/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs
new file mode 100644
index 0000000..5a6917b
--- /dev/null
+++ b/src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs
@@ -0,0 +1,14 @@
+namespace Bss.Platform.Events.Abstractions;
+
+public interface IEventTypeProvider
+{
+ ///
+ /// Internal events are sent to the Rabbit->CAP queue and handled within the target system
+ ///
+ IReadOnlyDictionary InputEvents { get; }
+
+ ///
+ /// External events are sent to the Rabbit exchange, but do not have a handler in the target system
+ ///
+ IReadOnlyDictionary OutputEvents { get; }
+}
diff --git a/src/Bss.Platform.Events.Abstractions/IFailedEventProcessor.cs b/src/Bss.Platform.Events.Abstractions/IFailedEventProcessor.cs
new file mode 100644
index 0000000..01ea889
--- /dev/null
+++ b/src/Bss.Platform.Events.Abstractions/IFailedEventProcessor.cs
@@ -0,0 +1,6 @@
+namespace Bss.Platform.Events.Abstractions;
+
+public interface IFailedEventProcessor
+{
+ Task HandleAsync(object? value, Exception ex);
+}
diff --git a/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs b/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs
index 1e6cb80..d95f2ad 100644
--- a/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs
+++ b/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs
@@ -1,6 +1,8 @@
namespace Bss.Platform.Events.Abstractions;
-public interface IIntegrationEventPublisher
+public interface IIntegrationEventPublisher
{
- Task PublishAsync(IIntegrationEvent @event, CancellationToken cancellationToken);
+ Task PublishAsync(T @event, CancellationToken cancellationToken);
}
+
+public interface IIntegrationEventPublisher : IIntegrationEventPublisher;
diff --git a/src/Bss.Platform.Events/CapConsumerServiceSelector.cs b/src/Bss.Platform.Events/CapConsumerServiceSelector.cs
deleted file mode 100644
index dd33ba5..0000000
--- a/src/Bss.Platform.Events/CapConsumerServiceSelector.cs
+++ /dev/null
@@ -1,53 +0,0 @@
-using System.Reflection;
-
-using Bss.Platform.Events.Abstractions;
-
-using DotNetCore.CAP;
-using DotNetCore.CAP.Internal;
-
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Options;
-
-namespace Bss.Platform.Events;
-
-public class CapConsumerServiceSelector(IServiceProvider serviceProvider, Assembly assembly)
- : ConsumerServiceSelector(serviceProvider)
-{
- protected override IEnumerable FindConsumersFromControllerTypes() => [];
-
- protected override IEnumerable FindConsumersFromInterfaceTypes(IServiceProvider provider)
- {
- var namePrefix = provider.GetRequiredService>().Value.TopicNamePrefix;
-
- return assembly
- .ExportedTypes
- .Where(x => typeof(IIntegrationEvent).IsAssignableFrom(x) && x is { IsInterface: false, IsAbstract: false })
- .Select(x => this.CreateExecutorDescriptor(typeof(CapConsumerExecutor<>).MakeGenericType(x), x, namePrefix));
- }
-
- private ConsumerExecutorDescriptor CreateExecutorDescriptor(Type executor, Type @event, string? namePrefix)
- {
- var subscribeAttribute = new CapSubscribeAttribute(@event.Name);
- this.SetSubscribeAttribute(subscribeAttribute);
-
- var methodInfo = executor
- .GetRuntimeMethods()
- .Single(x => x.Name.Contains(nameof(CapConsumerExecutor.HandleAsync)));
-
- var methodParameters = methodInfo.GetParameters();
- return new ConsumerExecutorDescriptor
- {
- Attribute = subscribeAttribute,
- ClassAttribute = null,
- MethodInfo = methodInfo,
- ImplTypeInfo = executor.GetTypeInfo(),
- ServiceTypeInfo = null,
- TopicNamePrefix = namePrefix,
- Parameters = new List
- {
- new() { ParameterType = methodParameters[0].ParameterType, IsFromCap = false },
- new() { ParameterType = methodParameters[1].ParameterType, IsFromCap = true }
- }
- };
- }
-}
diff --git a/src/Bss.Platform.Events/DependencyInjection.cs b/src/Bss.Platform.Events/DependencyInjection.cs
index f67d064..b77d8fb 100644
--- a/src/Bss.Platform.Events/DependencyInjection.cs
+++ b/src/Bss.Platform.Events/DependencyInjection.cs
@@ -3,11 +3,14 @@
using Bss.Platform.Events.Abstractions;
using Bss.Platform.Events.Interfaces;
+using Bss.Platform.Events.Internal;
using Bss.Platform.Events.Models;
using Bss.Platform.Events.Publishers;
using DotNetCore.CAP;
+using DotNetCore.CAP.Filter;
using DotNetCore.CAP.Internal;
+using DotNetCore.CAP.Messages;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Filters;
@@ -30,61 +33,148 @@ public static IServiceCollection AddPlatformIntegrationEvents(
{
services
.AddSingleton()
- .AddSingleton(x => new CapConsumerServiceSelector(x, eventsAssembly))
- .AddScoped()
- .AddScoped(
- serviceProvider =>
- {
- var capTransaction = ActivatorUtilities.CreateInstance(serviceProvider);
- capTransaction.DbTransaction = serviceProvider.GetRequiredService();
- return capTransaction;
- })
- .AddCap(
- x =>
- {
- var eventsOptions = IntegrationEventsOptions.Default;
- setup?.Invoke(eventsOptions);
+ .AddSingleton(x => new(x, eventsAssembly))
+ .AddScoped()
+ .AddPlatformIntegrationEventsInternal(setup);
- x.FailedRetryCount = eventsOptions.FailedRetryCount;
- x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds;
+ return services;
+ }
- x.UseSqlServer(
- o =>
- {
- o.ConnectionString = eventsOptions.SqlServer.ConnectionString;
- o.Schema = eventsOptions.SqlServer.Schema;
- });
+ ///
+ /// A new way to register integration events, required to set up internal and external events
+ /// Used Bss.Platform.Mediation
+ ///
+ /// Automatically registered IIntegrationEventPublisher<TEvent> wrap it if you need
+ public static IServiceCollection AddPlatformIntegrationEvents(
+ this IServiceCollection services,
+ Action> setupEvents,
+ Action setupOptions)
+ where TEventProcessor : class, IIntegrationEventProcessor =>
+ services.AddPlatformIntegrationEvents(setupEvents, setupOptions);
+
+ ///
+ /// A new way to register integration events, required to set up internal and external events
+ ///
+ /// Automatically registered IIntegrationEventPublisher<TEvent> wrap it if you need
+ public static IServiceCollection AddPlatformIntegrationEvents(
+ this IServiceCollection services,
+ Action> setupEvents,
+ Action setupOptions)
+ where TEventProcessor : class, IIntegrationEventProcessor
+ where TEvent : notnull =>
+ services.AddPlatformIntegrationEvents(setupEvents, setupOptions);
+
+ ///
+ /// A new way to register integration events, required to set up internal and external events
+ ///
+ /// Automatically registered IIntegrationEventPublisher<TEvent> wrap it if you need
+ public static IServiceCollection AddPlatformIntegrationEvents(
+ this IServiceCollection services,
+ Action> setupEvents,
+ Action setupOptions)
+ where TEventProcessor : class, IIntegrationEventProcessor
+ where TInputEvent : notnull
+ where TOutputEvent : notnull
+ {
+ var typeProvider = new EventTypeProvider();
+ setupEvents.Invoke(typeProvider);
+
+ services
+ .AddSingleton(typeProvider)
+ .AddSingleton()
+ .AddScoped, IntegrationEventPublisherNew>()
+ .AddPlatformIntegrationEventsInternal(setupOptions)
+ .Configure((RabbitMQOptions opt) =>
+ {
+ // note: required for messages generated by not CAP
+ opt.CustomHeadersBuilder = (msg, sp) =>
+ [
+ new(Headers.MessageId, sp.GetRequiredService().NextId().ToString()),
+ new(Headers.MessageName, msg.RoutingKey),
+ new(Headers.Type, typeof(TInputEvent).Name)
+ ];
+ });
+
+ services.AddSingleton, TEventProcessor>();
+ // NOTE: register TEventProcessor for each type (required for CapConsumerExecutor)
+ typeProvider.InputEvents.Keys
+ .Select(t => typeof(IIntegrationEventProcessor<>).MakeGenericType(t))
+ .ToList()
+ .ForEach(x => services.AddSingleton(x, sp => sp.GetRequiredService>()));
- x.UseDashboard(o =>
+ return services;
+ }
+
+ private static IServiceCollection AddPlatformIntegrationEventsInternal(
+ this IServiceCollection services,
+ Action? setupEventOptions = null)
+ {
+ var eventsOptions = new IntegrationEventsOptions();
+ setupEventOptions?.Invoke(eventsOptions);
+
+ if (eventsOptions.UseFailedEventProcessor)
+ {
+ services.AddScoped();
+ }
+
+ services
+ .AddScoped(serviceProvider =>
+ {
+ var capTransaction = ActivatorUtilities.CreateInstance(serviceProvider);
+ capTransaction.DbTransaction = serviceProvider.GetRequiredService();
+ return capTransaction;
+ })
+ .AddCap(x =>
+ {
+ x.FailedRetryCount = eventsOptions.FailedRetryCount;
+ x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds;
+
+ if (!string.IsNullOrEmpty(eventsOptions.SqlServer.ConnectionString))
+ {
+ x.UseSqlServer(o =>
{
- o.PathMatch = eventsOptions.DashboardPath;
- if (eventsOptions.AuthorizationPredicate is { } authPredicate)
- {
- o.AllowAnonymousExplicit = false;
- o.AuthorizationPolicy = AddDashboardAuthorizationPolicy(services, authPredicate);
- }
+ o.ConnectionString = eventsOptions.SqlServer.ConnectionString;
+ o.Schema = eventsOptions.SqlServer.Schema;
});
+ }
- if (!eventsOptions.MessageQueue.Enable)
+ x.UseDashboard(o =>
+ {
+ o.PathMatch = eventsOptions.DashboardPath;
+ o.PathBase = eventsOptions.GatewayPrefix;
+ if (eventsOptions.AuthorizationPredicate is { } authPredicate)
{
- x.UseInMemoryMessageQueue();
- return;
+ o.AllowAnonymousExplicit = false;
+ o.AuthorizationPolicy = AddDashboardAuthorizationPolicy(services, authPredicate);
}
-
- x.DefaultGroupName = eventsOptions.MessageQueue.ExchangeName;
- x.UseRabbitMQ(
- o =>
- {
- o.HostName = eventsOptions.MessageQueue.Host;
- o.Port = eventsOptions.MessageQueue.Port;
- o.VirtualHost = eventsOptions.MessageQueue.VirtualHost;
- o.Password = eventsOptions.MessageQueue.Secret;
- o.UserName = eventsOptions.MessageQueue.UserName;
- o.ExchangeName = eventsOptions.MessageQueue.ExchangeName;
- o.BasicQosOptions = new RabbitMQOptions.BasicQos(1, true);
- });
});
+ var rabbitSettings = eventsOptions.MessageQueue;
+ if (rabbitSettings.Enable)
+ {
+ x.DefaultGroupName = string.IsNullOrWhiteSpace(rabbitSettings.QueueName)
+ ? rabbitSettings.ExchangeName
+ : rabbitSettings.QueueName;
+
+ x.UseRabbitMQ(o =>
+ {
+ o.HostName = rabbitSettings.Host;
+ o.Port = rabbitSettings.Port;
+ o.VirtualHost = rabbitSettings.VirtualHost;
+ o.Password = rabbitSettings.Secret;
+ o.UserName = rabbitSettings.UserName;
+ o.ExchangeName = rabbitSettings.ExchangeName;
+ o.BasicQosOptions = new(1, true);
+ });
+ }
+ else
+ {
+ x.UseInMemoryMessageQueue();
+ }
+
+ eventsOptions.OverrideCapOptions?.Invoke(x);
+ });
+
return services;
}
diff --git a/src/Bss.Platform.Events/EventTypeProvider.cs b/src/Bss.Platform.Events/EventTypeProvider.cs
new file mode 100644
index 0000000..c87d837
--- /dev/null
+++ b/src/Bss.Platform.Events/EventTypeProvider.cs
@@ -0,0 +1,78 @@
+using System.Reflection;
+
+using Bss.Platform.Events.Abstractions;
+using Bss.Platform.Events.Interfaces;
+
+namespace Bss.Platform.Events;
+
+internal class EventTypeProvider : IEventTypeProvider, IIntegrationEventSetup
+{
+ public IReadOnlyDictionary InputEvents => this.inputTypes;
+ public IReadOnlyDictionary OutputEvents => this.outputTypes;
+
+ private readonly Dictionary inputTypes = [];
+ private readonly Dictionary outputTypes = [];
+
+ public IIntegrationEventSetup AddInputEvents(string prefix = "", params Assembly[] assemblies)
+ where TEvent : TIn
+ {
+ var newTypes = GetOrDefaultAssembly(assemblies)
+ .SelectMany(x => x.DefinedTypes)
+ .Where(IsAssignableAndSatisfyCondition)
+ .Except(this.outputTypes.Keys);
+
+ foreach (var newType in newTypes)
+ {
+ this.inputTypes[newType] = $"{prefix}{newType.Name}";
+ }
+
+ return this;
+ }
+
+ public IIntegrationEventSetup AddInputEvent(string routingKey)
+ where TEvent : class, TIn
+ {
+ var type = typeof(TEvent);
+ this.inputTypes[type] = routingKey;
+ return this;
+ }
+
+ public IIntegrationEventSetup AddOutputEvents(string prefix = "", params Assembly[] assemblies)
+ where TEvent : TOut
+ {
+ var newTypes = GetOrDefaultAssembly(assemblies)
+ .SelectMany(x => x.DefinedTypes)
+ .Where(IsAssignableAndSatisfyCondition)
+ .Except(this.outputTypes.Keys);
+
+ foreach (var newType in newTypes)
+ {
+ this.outputTypes[newType] = $"{prefix}{newType.Name}";
+ }
+
+ return this;
+ }
+
+ public IIntegrationEventSetup AddOutputEvent(string routingKey)
+ where TEvent : class, TOut
+ {
+ var type = typeof(TEvent);
+ this.outputTypes[type] = routingKey;
+ return this;
+ }
+
+ private static Assembly[] GetOrDefaultAssembly(Assembly[] assemblies)
+ {
+ if (assemblies.Length == 0)
+ {
+ assemblies = [typeof(TEvent).Assembly];
+ }
+
+ return assemblies;
+ }
+
+ private static bool IsAssignableAndSatisfyCondition(TypeInfo typeInfo) =>
+ typeInfo is { IsInterface: false, IsAbstract: false, IsNested: false }
+ && typeof(TAssignableTo).IsAssignableFrom(typeInfo)
+ && !typeInfo.Name.Contains('<');
+}
diff --git a/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs b/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs
index 25af453..8ff2064 100644
--- a/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs
+++ b/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs
@@ -2,7 +2,9 @@
namespace Bss.Platform.Events.Interfaces;
-public interface IIntegrationEventProcessor
+public interface IIntegrationEventProcessor : IIntegrationEventProcessor;
+
+public interface IIntegrationEventProcessor
{
- Task ProcessAsync(IIntegrationEvent @event, CancellationToken token);
+ Task ProcessAsync(T @event, CancellationToken token);
}
diff --git a/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs b/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs
new file mode 100644
index 0000000..31de0a8
--- /dev/null
+++ b/src/Bss.Platform.Events/Interfaces/IIntegrationEventSetup.cs
@@ -0,0 +1,39 @@
+using System.Reflection;
+
+namespace Bss.Platform.Events.Interfaces;
+
+public interface IIntegrationEventSetup
+{
+ ///
+ /// Add multiple events implemented or inherited TInternalBase with the prefix
+ ///
+ ///
+ /// prefix to add before type name
+ /// <TInternalBase>("INT.") -> INT.TInternal
+ ///
+ /// assemblies to find types, if not passed - will be used assembly contains TInternalBase
+ IIntegrationEventSetup AddInputEvents(string prefix = "", params Assembly[] assemblies)
+ where TInputBase : TIn;
+
+ ///
+ /// Add a single event with the routing key, overrides if it already exists (added by
+ /// )
+ ///
+ IIntegrationEventSetup AddInputEvent(string routingKey) where TInput : class, TIn;
+
+ ///
+ /// Add multiple events implemented or inherited TExternalBase with the prefix
+ ///
+ ///
+ /// prefix to add before type name
+ /// <TExternalBase>("SYS.") -> SYS.TExternal
+ ///
+ /// assemblies to find types, if not passed - will be used assembly contains TExternalBase
+ IIntegrationEventSetup AddOutputEvents(string prefix, params Assembly[] assemblies) where TOutputBase : TOut;
+
+ ///
+ /// Add a single event with the routing key, overrides if it already exists (added by
+ /// Math.Max(capOptions.Value.FailedRetryCount - 1, 0);
+
+ public override Task OnSubscribeExceptionAsync(ExceptionContext context)
+ {
+ if (context.MediumMessage.Retries != this.LatestRetryCount)
+ {
+ return Task.CompletedTask;
+ }
+
+ var ex = context.Exception;
+ var errorText = ex.GetBaseException().Message;
+
+ var details = new CapFailureDetails(ex.GetType().FullName ?? ex.GetType().Name, errorText, ex.StackTrace);
+ context.DeliverMessage.Headers[DetailsHeader] = JsonSerializer.Serialize(details);
+
+ var payloadParam = context.ConsumerDescriptor.Parameters.SingleOrDefault(p => !p.IsFromCap);
+ var value = context.DeliverMessage.Value;
+
+ var payload = payloadParam is not null && value is not null && serializer.IsJsonType(value)
+ ? serializer.Deserialize(value, payloadParam.ParameterType)
+ : value;
+
+ return Task.WhenAll(failsProcessors.Select(x => x.HandleAsync(payload, ex)));
+ }
+
+ internal sealed record CapFailureDetails(string ExceptionType, string Message, string? StackTrace);
+}
diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs
index 5f1a5fa..dd0fd55 100644
--- a/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs
+++ b/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs
@@ -15,4 +15,6 @@ public class IntegrationEventsMessageQueueOptions
public string VirtualHost { get; set; } = default!;
public string ExchangeName { get; set; } = default!;
+
+ public string QueueName { get; set; } = default!;
}
diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs
index ea700e8..8eaca0b 100644
--- a/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs
+++ b/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs
@@ -1,19 +1,34 @@
+using DotNetCore.CAP;
+
using Microsoft.AspNetCore.Http;
namespace Bss.Platform.Events.Models;
public class IntegrationEventsOptions
{
- public string DashboardPath { get; set; } = default!;
+ public string DashboardPath { get; set; } = "/admin/events";
+
+ public string GatewayPrefix { get; set; } = string.Empty;
+
+ public int FailedRetryCount { get; set; } = 5;
+
+ public int RetentionDays { get; set; } = 15;
+
+ ///
+ /// required to fill connection string in options, otherwise MS SQL db won't connect
+ ///
+ public IntegrationEventsSqlServerOptions SqlServer { get; set; } = new() { Schema = "events" };
- public int FailedRetryCount { get; set; }
+ public IntegrationEventsMessageQueueOptions MessageQueue { get; set; } = new() { Enable = true };
- public int RetentionDays { get; set; }
+ public Action? OverrideCapOptions { get; set; }
- public IntegrationEventsSqlServerOptions SqlServer { get; set; } = default!;
+ ///
+ /// When enable will be registered CAP filter to handle failed event on the last attempt, allow using multiple
+ /// scoped implementations
+ ///
+ public bool UseFailedEventProcessor { get; set; }
- public IntegrationEventsMessageQueueOptions MessageQueue { get; set; } = default!;
-
///
/// Any condition to check that a user should get access to events dashboard
///
@@ -21,14 +36,4 @@ public class IntegrationEventsOptions
/// AuthorizationPolicyPredicate = (httpContext) => httpContext.RequestServices.GetRequiredService<ICurrentUser>().IsAdminAsync()
///
public Func>? AuthorizationPredicate { get; set; }
-
- public static IntegrationEventsOptions Default =>
- new()
- {
- DashboardPath = "/admin/events",
- FailedRetryCount = 5,
- RetentionDays = 15,
- SqlServer = new IntegrationEventsSqlServerOptions { Schema = "events" },
- MessageQueue = new IntegrationEventsMessageQueueOptions { Enable = true }
- };
}
diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs
index f38a179..8825cf3 100644
--- a/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs
+++ b/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs
@@ -2,6 +2,9 @@ namespace Bss.Platform.Events.Models;
public class IntegrationEventsSqlServerOptions
{
+ ///
+ /// When empty - MS SQL db won't connect
+ ///
public string ConnectionString { get; set; } = default!;
public string Schema { get; set; } = default!;
diff --git a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs
index 1d0273c..95f60bb 100644
--- a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs
+++ b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs
@@ -1,19 +1,58 @@
using Bss.Platform.Events.Abstractions;
+using Bss.Platform.Events.Interfaces;
using DotNetCore.CAP;
namespace Bss.Platform.Events.Publishers;
-public class IntegrationEventPublisher(ICapPublisher capPublisher, ICapTransaction capTransaction) : IIntegrationEventPublisher
+public class IntegrationEventPublisherLegacy(ICapPublisher capPublisher, ICapTransaction capTransaction)
+ : IntegrationEventPublisherBase(capPublisher, capTransaction), IIntegrationEventPublisher
{
- public Task PublishAsync(IIntegrationEvent @event, CancellationToken cancellationToken)
+ private readonly ICapPublisher capPublisher = capPublisher;
+
+ protected override Task PublishInternalAsync(IIntegrationEvent @event, CancellationToken cancellationToken) =>
+ this.capPublisher.PublishAsync(@event.GetType().Name, @event, cancellationToken: cancellationToken);
+}
+
+public class IntegrationEventPublisherNew(ICapPublisher capPublisher, ICapTransaction capTransaction, IEventTypeProvider eventTypeProvider)
+ : IntegrationEventPublisherBase(capPublisher, capTransaction)
+ where T: notnull
+{
+ private readonly ICapPublisher capPublisher = capPublisher;
+
+ protected override async Task PublishInternalAsync(T @event, CancellationToken cancellationToken)
+ {
+ if (eventTypeProvider.InputEvents.TryGetValue(@event.GetType(), out var internalRoutingKey))
+ {
+ await this.capPublisher.PublishAsync(internalRoutingKey, @event, cancellationToken: cancellationToken);
+ }
+
+ if (eventTypeProvider.OutputEvents.TryGetValue(@event.GetType(), out var externalRoutingKey)
+ && externalRoutingKey != internalRoutingKey)
+ {
+ await this.capPublisher.PublishAsync(externalRoutingKey, @event, cancellationToken: cancellationToken);
+ }
+
+ if (internalRoutingKey == null && externalRoutingKey == null)
+ {
+ throw new($"No routing key found for event type {@event.GetType().FullName}");
+ }
+ }
+}
+
+public abstract class IntegrationEventPublisherBase(ICapPublisher capPublisher, ICapTransaction capTransaction)
+ : IIntegrationEventPublisher
+{
+ public Task PublishAsync(T @event, CancellationToken cancellationToken)
{
if (capPublisher.Transaction is not null && capPublisher.Transaction != capTransaction)
{
- throw new Exception("There cannot be different CAP transactions within the same scope");
+ throw new("There cannot be different CAP transactions within the same scope");
}
capPublisher.Transaction = capTransaction;
- return capPublisher.PublishAsync(@event.GetType().Name, @event, cancellationToken: cancellationToken);
+ return this.PublishInternalAsync(@event, cancellationToken);
}
+
+ protected abstract Task PublishInternalAsync(T @event, CancellationToken cancellationToken);
}
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index c854181..9067f20 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -1,6 +1,7 @@
- net9.0
+ net9.0
+ 14
https://github.com/Luxoft/bss-platform
git
diff --git a/src/__SolutionItems/CommonAssemblyInfo.cs b/src/__SolutionItems/CommonAssemblyInfo.cs
index cf6130e..0fcc942 100644
--- a/src/__SolutionItems/CommonAssemblyInfo.cs
+++ b/src/__SolutionItems/CommonAssemblyInfo.cs
@@ -4,9 +4,9 @@
[assembly: AssemblyCompany("Luxoft")]
[assembly: AssemblyCopyright("Copyright © Luxoft 2026")]
-[assembly: AssemblyVersion("1.6.8.0")]
-[assembly: AssemblyFileVersion("1.6.8.0")]
-[assembly: AssemblyInformationalVersion("1.6.8.0")]
+[assembly: AssemblyVersion("1.6.9")]
+[assembly: AssemblyFileVersion("1.6.9")]
+[assembly: AssemblyInformationalVersion("1.6.9")]
#if DEBUG
[assembly: AssemblyConfiguration("Debug")]