-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathSubscriptionOptions.cs
More file actions
130 lines (113 loc) · 4.64 KB
/
SubscriptionOptions.cs
File metadata and controls
130 lines (113 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Licensed under the MIT license.
// Copyright (c) The AppCore .NET project.
using System;
using System.Collections.Generic;
using AppCoreNet.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
namespace AppCoreNet.EventStore.Subscriptions;
/// <summary>
/// Provides options for subscriptions.
/// </summary>
public sealed class SubscriptionOptions
{
private readonly Dictionary<SubscriptionId, Subscriber> _subscribers = new();
internal Dictionary<SubscriptionId, EventHandlerSubscriptionOptions> EventHandlerOptions { get; } = new();
/// <summary>
/// Gets or sets the batch size when processing event subscriptions.
/// </summary>
public int BatchSize { get; set; } = 1024;
/// <summary>
/// Gets all configured subscriptions.
/// </summary>
/// <returns>The <see cref="IEnumerable{T}"/> of <see cref="Subscriber"/>.</returns>
public IEnumerable<Subscriber> GetSubscribers()
{
return _subscribers.Values;
}
/// <summary>
/// Adds a subscription listener.
/// </summary>
/// <param name="subscriptionId">The subscription ID.</param>
/// <param name="streamId">The stream ID.</param>
/// <param name="listenerFactory">The factory used to create the <see cref="ISubscriptionListener"/>.</param>
/// <returns>The <see cref="SubscriptionOptions"/> instance to allow chaining.</returns>
public SubscriptionOptions AddListener(
SubscriptionId subscriptionId,
StreamId streamId,
Func<IServiceProvider, ISubscriptionListener> listenerFactory)
{
Ensure.Arg.NotNull(subscriptionId);
if (_subscribers.ContainsKey(subscriptionId))
{
throw new InvalidOperationException($"Subscription with ID '{subscriptionId}' already added.");
}
_subscribers.Add(subscriptionId, new Subscriber(subscriptionId, streamId, listenerFactory));
return this;
}
/// <summary>
/// Adds a subscription listener.
/// </summary>
/// <param name="subscriptionId">The subscription ID.</param>
/// <param name="streamId">The stream ID.</param>
/// <param name="listener">The <see cref="ISubscriptionListener"/>.</param>
/// <returns>The <see cref="SubscriptionOptions"/> instance to allow chaining.</returns>
public SubscriptionOptions AddListener(
SubscriptionId subscriptionId,
StreamId streamId,
ISubscriptionListener listener)
{
Ensure.Arg.NotNull(listener);
return AddListener(
subscriptionId,
streamId,
_ => listener);
}
/// <summary>
/// Adds a subscription listener.
/// </summary>
/// <param name="subscriptionId">The subscription ID.</param>
/// <param name="streamId">The stream ID.</param>
/// <typeparam name="T">The type of the <see cref="ISubscriptionListener"/>.</typeparam>
/// <returns>The <see cref="SubscriptionOptions"/> instance to allow chaining.</returns>
public SubscriptionOptions AddListener<T>(SubscriptionId subscriptionId, StreamId streamId)
where T : ISubscriptionListener
{
return AddListener(
subscriptionId,
streamId,
sp => ActivatorUtilities.GetServiceOrCreateInstance<T>(sp));
}
/// <summary>
/// Adds a subscription listener which dispatches events to <see cref="IEventHandler"/>.
/// </summary>
/// <param name="subscriptionId">The subscription ID.</param>
/// <param name="streamId">The stream ID.</param>
/// <param name="configure">Delegate which is invoked to configure the event handlers.</param>
/// <returns>The <see cref="SubscriptionOptions"/> instance to allow chaining.</returns>
public SubscriptionOptions AddEventHandlers(
SubscriptionId subscriptionId,
StreamId streamId,
Action<EventHandlerSubscriptionOptions> configure)
{
Ensure.Arg.NotNull(subscriptionId);
Ensure.Arg.NotNull(configure);
if (_subscribers.TryGetValue(subscriptionId, out Subscriber? subscriber))
{
if (subscriber.StreamId != streamId)
throw new InvalidOperationException();
if (!EventHandlerOptions.ContainsKey(subscriptionId))
throw new InvalidOperationException();
}
else
{
AddListener<EventHandlerSubscriptionListener>(subscriptionId, streamId);
}
if (!EventHandlerOptions.TryGetValue(subscriptionId, out EventHandlerSubscriptionOptions? options))
{
options = new EventHandlerSubscriptionOptions();
EventHandlerOptions.Add(subscriptionId, options);
}
configure(options);
return this;
}
}