Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Bss.Platform.Events.Abstractions/IEventTypeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Bss.Platform.Events.Abstractions;

public interface IEventTypeProvider
{
/// <summary>
/// Internal events are sent to the Rabbit->CAP queue and handled within the target system
/// </summary>
IReadOnlyDictionary<Type, string> InputEvents { get; }

/// <summary>
/// External events are sent to the Rabbit exchange, but do not have a handler in the target system
/// </summary>
IReadOnlyDictionary<Type, string> OutputEvents { get; }
}
6 changes: 6 additions & 0 deletions src/Bss.Platform.Events.Abstractions/IFailedEventProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Bss.Platform.Events.Abstractions;

public interface IFailedEventProcessor
{
Task HandleAsync(object? value, Exception ex);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace Bss.Platform.Events.Abstractions;

public interface IIntegrationEventPublisher
public interface IIntegrationEventPublisher<in T>
{
Task PublishAsync(IIntegrationEvent @event, CancellationToken cancellationToken);
Task PublishAsync(T @event, CancellationToken cancellationToken);
}

public interface IIntegrationEventPublisher : IIntegrationEventPublisher<IIntegrationEvent>;
53 changes: 0 additions & 53 deletions src/Bss.Platform.Events/CapConsumerServiceSelector.cs

This file was deleted.

180 changes: 135 additions & 45 deletions src/Bss.Platform.Events/DependencyInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

using Bss.Platform.Events.Abstractions;
using Bss.Platform.Events.Interfaces;
using Bss.Platform.Events.Internal;
using Bss.Platform.Events.Models;
using Bss.Platform.Events.Publishers;

using DotNetCore.CAP;
using DotNetCore.CAP.Filter;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;

using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Filters;
Expand All @@ -30,61 +33,148 @@ public static IServiceCollection AddPlatformIntegrationEvents<TEventProcessor>(
{
services
.AddSingleton<IIntegrationEventProcessor, TEventProcessor>()
.AddSingleton<IConsumerServiceSelector, CapConsumerServiceSelector>(x => new CapConsumerServiceSelector(x, eventsAssembly))
.AddScoped<IIntegrationEventPublisher, IntegrationEventPublisher>()
.AddScoped<ICapTransaction>(
serviceProvider =>
{
var capTransaction = ActivatorUtilities.CreateInstance<SqlServerCapTransaction>(serviceProvider);
capTransaction.DbTransaction = serviceProvider.GetRequiredService<IDbTransaction>();
return capTransaction;
})
.AddCap(
x =>
{
var eventsOptions = IntegrationEventsOptions.Default;
setup?.Invoke(eventsOptions);
.AddSingleton<IConsumerServiceSelector, CapConsumerServiceSelectorLegacy>(x => new(x, eventsAssembly))
.AddScoped<IIntegrationEventPublisher, IntegrationEventPublisherLegacy>()
.AddPlatformIntegrationEventsInternal(setup);

x.FailedRetryCount = eventsOptions.FailedRetryCount;
x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds;
return services;
}

x.UseSqlServer(
o =>
{
o.ConnectionString = eventsOptions.SqlServer.ConnectionString;
o.Schema = eventsOptions.SqlServer.Schema;
});
/// <summary>
/// A new way to register integration events, required to set up internal and external events <br/>
/// Used <see cref="Bss.Platform.Mediation.Abstractions.IMediator">Bss.Platform.Mediation</see>
/// </summary>
/// <returns>Automatically registered IIntegrationEventPublisher&lt;TEvent&gt; wrap it if you need</returns>
public static IServiceCollection AddPlatformIntegrationEvents<TEventProcessor>(
this IServiceCollection services,
Action<IIntegrationEventSetup<IIntegrationEvent, IIntegrationEvent>> setupEvents,
Action<IntegrationEventsOptions> setupOptions)
where TEventProcessor : class, IIntegrationEventProcessor<IIntegrationEvent> =>
services.AddPlatformIntegrationEvents<TEventProcessor, IIntegrationEvent, IIntegrationEvent>(setupEvents, setupOptions);

/// <summary>
/// A new way to register integration events, required to set up internal and external events
/// </summary>
/// <returns>Automatically registered IIntegrationEventPublisher&lt;TEvent&gt; wrap it if you need</returns>
public static IServiceCollection AddPlatformIntegrationEvents<TEventProcessor, TEvent>(
this IServiceCollection services,
Action<IIntegrationEventSetup<TEvent, TEvent>> setupEvents,
Action<IntegrationEventsOptions> setupOptions)
where TEventProcessor : class, IIntegrationEventProcessor<TEvent>
where TEvent : notnull =>
services.AddPlatformIntegrationEvents<TEventProcessor, TEvent, TEvent>(setupEvents, setupOptions);

/// <summary>
/// A new way to register integration events, required to set up internal and external events
/// </summary>
/// <returns>Automatically registered IIntegrationEventPublisher&lt;TEvent&gt; wrap it if you need</returns>
public static IServiceCollection AddPlatformIntegrationEvents<TEventProcessor, TInputEvent, TOutputEvent>(
this IServiceCollection services,
Action<IIntegrationEventSetup<TInputEvent, TOutputEvent>> setupEvents,
Action<IntegrationEventsOptions> setupOptions)
where TEventProcessor : class, IIntegrationEventProcessor<TInputEvent>
where TInputEvent : notnull
where TOutputEvent : notnull
{
var typeProvider = new EventTypeProvider<TInputEvent,TOutputEvent>();
setupEvents.Invoke(typeProvider);

services
.AddSingleton<IEventTypeProvider>(typeProvider)
.AddSingleton<IConsumerServiceSelector, CapConsumerServiceSelectorNew>()
.AddScoped<IIntegrationEventPublisher<TOutputEvent>, IntegrationEventPublisherNew<TOutputEvent>>()
.AddPlatformIntegrationEventsInternal(setupOptions)
.Configure((RabbitMQOptions opt) =>
{
// note: required for messages generated by not CAP
opt.CustomHeadersBuilder = (msg, sp) =>
[
new(Headers.MessageId, sp.GetRequiredService<ISnowflakeId>().NextId().ToString()),
new(Headers.MessageName, msg.RoutingKey),
new(Headers.Type, typeof(TInputEvent).Name)
];
});

services.AddSingleton<IIntegrationEventProcessor<TInputEvent>, TEventProcessor>();
// NOTE: register TEventProcessor for each type (required for CapConsumerExecutor<TEvent>)
typeProvider.InputEvents.Keys
.Select(t => typeof(IIntegrationEventProcessor<>).MakeGenericType(t))
.ToList()
.ForEach(x => services.AddSingleton(x, sp => sp.GetRequiredService<IIntegrationEventProcessor<TInputEvent>>()));

x.UseDashboard(o =>
return services;
}

private static IServiceCollection AddPlatformIntegrationEventsInternal(
this IServiceCollection services,
Action<IntegrationEventsOptions>? setupEventOptions = null)
{
var eventsOptions = new IntegrationEventsOptions();
setupEventOptions?.Invoke(eventsOptions);

if (eventsOptions.UseFailedEventProcessor)
{
services.AddScoped<ISubscribeFilter, CapExceptionFilter>();
}

services
.AddScoped<ICapTransaction>(serviceProvider =>
{
var capTransaction = ActivatorUtilities.CreateInstance<SqlServerCapTransaction>(serviceProvider);
capTransaction.DbTransaction = serviceProvider.GetRequiredService<IDbTransaction>();
return capTransaction;
})
.AddCap(x =>
{
x.FailedRetryCount = eventsOptions.FailedRetryCount;
x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds;

if (!string.IsNullOrEmpty(eventsOptions.SqlServer.ConnectionString))
{
x.UseSqlServer(o =>
{
o.PathMatch = eventsOptions.DashboardPath;
if (eventsOptions.AuthorizationPredicate is { } authPredicate)
{
o.AllowAnonymousExplicit = false;
o.AuthorizationPolicy = AddDashboardAuthorizationPolicy(services, authPredicate);
}
o.ConnectionString = eventsOptions.SqlServer.ConnectionString;
o.Schema = eventsOptions.SqlServer.Schema;
});
}

if (!eventsOptions.MessageQueue.Enable)
x.UseDashboard(o =>
{
o.PathMatch = eventsOptions.DashboardPath;
o.PathBase = eventsOptions.GatewayPrefix;
if (eventsOptions.AuthorizationPredicate is { } authPredicate)
{
x.UseInMemoryMessageQueue();
return;
o.AllowAnonymousExplicit = false;
o.AuthorizationPolicy = AddDashboardAuthorizationPolicy(services, authPredicate);
}

x.DefaultGroupName = eventsOptions.MessageQueue.ExchangeName;
x.UseRabbitMQ(
o =>
{
o.HostName = eventsOptions.MessageQueue.Host;
o.Port = eventsOptions.MessageQueue.Port;
o.VirtualHost = eventsOptions.MessageQueue.VirtualHost;
o.Password = eventsOptions.MessageQueue.Secret;
o.UserName = eventsOptions.MessageQueue.UserName;
o.ExchangeName = eventsOptions.MessageQueue.ExchangeName;
o.BasicQosOptions = new RabbitMQOptions.BasicQos(1, true);
});
});

var rabbitSettings = eventsOptions.MessageQueue;
if (rabbitSettings.Enable)
{
x.DefaultGroupName = string.IsNullOrWhiteSpace(rabbitSettings.QueueName)
? rabbitSettings.ExchangeName
: rabbitSettings.QueueName;

x.UseRabbitMQ(o =>
{
o.HostName = rabbitSettings.Host;
o.Port = rabbitSettings.Port;
o.VirtualHost = rabbitSettings.VirtualHost;
o.Password = rabbitSettings.Secret;
o.UserName = rabbitSettings.UserName;
o.ExchangeName = rabbitSettings.ExchangeName;
o.BasicQosOptions = new(1, true);
});
}
else
{
x.UseInMemoryMessageQueue();
}

eventsOptions.OverrideCapOptions?.Invoke(x);
});

return services;
}

Expand Down
78 changes: 78 additions & 0 deletions src/Bss.Platform.Events/EventTypeProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System.Reflection;

using Bss.Platform.Events.Abstractions;
using Bss.Platform.Events.Interfaces;

namespace Bss.Platform.Events;

internal class EventTypeProvider<TIn, TOut> : IEventTypeProvider, IIntegrationEventSetup<TIn, TOut>
{
public IReadOnlyDictionary<Type, string> InputEvents => this.inputTypes;
public IReadOnlyDictionary<Type, string> OutputEvents => this.outputTypes;

private readonly Dictionary<Type, string> inputTypes = [];
private readonly Dictionary<Type, string> outputTypes = [];

public IIntegrationEventSetup<TIn, TOut> AddInputEvents<TEvent>(string prefix = "", params Assembly[] assemblies)
where TEvent : TIn
{
var newTypes = GetOrDefaultAssembly<TEvent>(assemblies)
.SelectMany(x => x.DefinedTypes)
.Where(IsAssignableAndSatisfyCondition<TEvent>)
.Except(this.outputTypes.Keys);

foreach (var newType in newTypes)
{
this.inputTypes[newType] = $"{prefix}{newType.Name}";
}

return this;
}

public IIntegrationEventSetup<TIn, TOut> AddInputEvent<TEvent>(string routingKey)
where TEvent : class, TIn
{
var type = typeof(TEvent);
this.inputTypes[type] = routingKey;
return this;
}

public IIntegrationEventSetup<TIn, TOut> AddOutputEvents<TEvent>(string prefix = "", params Assembly[] assemblies)
where TEvent : TOut
{
var newTypes = GetOrDefaultAssembly<TEvent>(assemblies)
.SelectMany(x => x.DefinedTypes)
.Where(IsAssignableAndSatisfyCondition<TEvent>)
.Except(this.outputTypes.Keys);

foreach (var newType in newTypes)
{
this.outputTypes[newType] = $"{prefix}{newType.Name}";
}

return this;
}

public IIntegrationEventSetup<TIn, TOut> AddOutputEvent<TEvent>(string routingKey)
where TEvent : class, TOut
{
var type = typeof(TEvent);
this.outputTypes[type] = routingKey;
return this;
}

private static Assembly[] GetOrDefaultAssembly<TEvent>(Assembly[] assemblies)
{
if (assemblies.Length == 0)
{
assemblies = [typeof(TEvent).Assembly];
}

return assemblies;
}

private static bool IsAssignableAndSatisfyCondition<TAssignableTo>(TypeInfo typeInfo) =>
typeInfo is { IsInterface: false, IsAbstract: false, IsNested: false }
&& typeof(TAssignableTo).IsAssignableFrom(typeInfo)
&& !typeInfo.Name.Contains('<');
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

namespace Bss.Platform.Events.Interfaces;

public interface IIntegrationEventProcessor
public interface IIntegrationEventProcessor : IIntegrationEventProcessor<IIntegrationEvent>;

public interface IIntegrationEventProcessor<in T>
{
Task ProcessAsync(IIntegrationEvent @event, CancellationToken token);
Task ProcessAsync(T @event, CancellationToken token);
}
Loading
Loading