forked from event-driven-io/Blumchen
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSubscription.cs
More file actions
162 lines (145 loc) · 6.7 KB
/
Copy pathSubscription.cs
File metadata and controls
162 lines (145 loc) · 6.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
using System.Reflection;
using System.Runtime.CompilerServices;
using Blumchen.Database;
using Blumchen.Serialization;
using Blumchen.Subscriptions.Management;
using Blumchen.Subscriptions.ReplicationMessageHandlers;
using Blumchen.Subscriptions.SnapshotReader;
using Npgsql;
using Npgsql.Replication;
using Npgsql.Replication.PgOutput;
using Npgsql.Replication.PgOutput.Messages;
namespace Blumchen.Subscriptions;
using static PublicationManagement;
using static ReplicationSlotManagement;
using static ReplicationSlotManagement.CreateReplicationSlotResult;
public sealed class Subscription: IAsyncDisposable
{
public enum CreateStyle
{
WhenNotExists,
AlwaysRecreate,
Never
}
private LogicalReplicationConnection? _connection;
private readonly SubscriptionOptionsBuilder _builder = new();
private ISubscriptionOptions? _options;
public async IAsyncEnumerable<IEnvelope> Subscribe(
Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder> builder,
[EnumeratorCancellation] CancellationToken ct = default
)
{
await foreach (var _ in Subscribe(builder(_builder).Build(), ct))
yield return _;
}
internal async IAsyncEnumerable<IEnvelope> Subscribe(
ISubscriptionOptions subscriptionOptions,
[EnumeratorCancellation] CancellationToken ct = default
)
{
_options = subscriptionOptions;
var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = subscriptionOptions;
await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false);
_connection = new LogicalReplicationConnection(connectionStringBuilder.ConnectionString);
await _connection.Open(ct).ConfigureAwait(false);
await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false);
var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false);
PgOutputReplicationSlot slot;
if (result is not Created created)
{
slot = new PgOutputReplicationSlot(replicationSlotSetupOptions.SlotName);
}
else
{
slot = new PgOutputReplicationSlot(
new ReplicationSlotOptions(
replicationSlotSetupOptions.SlotName,
created.LogSequenceNumber
)
);
await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, _options, ct).ConfigureAwait(false))
await foreach (var subscribe in ProcessEnvelope<IEnvelope>(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false))
yield return subscribe;
}
await foreach (var message in
_connection.StartReplication(slot,
new PgOutputReplicationOptions(publicationSetupOptions.PublicationName, 1, replicationSlotSetupOptions.Binary), ct).ConfigureAwait(false))
{
if (message is InsertMessage insertMessage)
{
var envelope = await replicationDataMapper.ReadFromReplication(insertMessage, ct).ConfigureAwait(false);
await foreach (var subscribe in ProcessEnvelope<IEnvelope>(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false))
yield return subscribe;
}
// Always call SetReplicationStatus() or assign LastAppliedLsn and LastFlushedLsn individually
// so that Npgsql can inform the server which WAL files can be removed/recycled.
_connection.SetReplicationStatus(message.WalEnd);
await _connection.SendStatusUpdate(ct).ConfigureAwait(false);
}
}
private static async IAsyncEnumerable<T> ProcessEnvelope<T>(
IEnvelope envelope,
Dictionary<Type, IHandler> registry,
IErrorProcessor errorProcessor
) where T:class
{
switch (envelope)
{
case KoEnvelope error:
await errorProcessor.Process(error.Error).ConfigureAwait(false);
yield break;
case OkEnvelope okEnvelope:
{
var obj = okEnvelope.Value;
var objType = obj.GetType();
var (consumer, methodInfo) = Memoize(registry, objType, Consumer);
await ((Task)methodInfo.Invoke(consumer, [obj])!).ConfigureAwait(false);
yield return (T)envelope;
yield break;
}
}
}
private static readonly Dictionary<Type, (IHandler consumer, MethodInfo methodInfo)> Cache = [];
private static (IHandler consumer, MethodInfo methodInfo) Memoize
(
Dictionary<Type, IHandler> registry,
Type objType,
Func<Dictionary<Type, IHandler>, Type, (IHandler consumer, MethodInfo methodInfo)> func
)
{
if (!Cache.TryGetValue(objType, out var entry))
entry = func(registry, objType);
Cache[objType] = entry;
return entry;
}
private static (IHandler consumer, MethodInfo methodInfo) Consumer(Dictionary<Type, IHandler> registry, Type objType)
{
var consumer = registry[objType] ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}");
var methodInfos = consumer.GetType().GetMethods(BindingFlags.Instance|BindingFlags.Public);
var methodInfo = methodInfos.SingleOrDefault(mi=>mi.GetParameters().Any(pa => pa.ParameterType == objType))
?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}");
return (consumer, methodInfo);
}
private static async IAsyncEnumerable<IEnvelope> ReadExistingRowsFromSnapshot(
NpgsqlDataSource dataSource,
string snapshotName,
ISubscriptionOptions options,
[EnumeratorCancellation] CancellationToken ct = default
)
{
var connection = await dataSource.OpenConnectionAsync(ct).ConfigureAwait(false);
await using var connection1 = connection.ConfigureAwait(false);
await foreach (var row in connection.GetRowsFromSnapshot(
snapshotName,
options.PublicationOptions.TableDescriptor,
options.DataMapper,
options.PublicationOptions.TypeResolver.Keys().ToHashSet(),
ct).ConfigureAwait(false))
yield return row;
}
public async ValueTask DisposeAsync()
{
if(_connection != null)
await _connection.DisposeAsync().ConfigureAwait(false);
}
}