diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_envelope_handler_fails.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_envelope_handler_fails.cs new file mode 100644 index 00000000000..e2be31cba5c --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_envelope_handler_fails.cs @@ -0,0 +1,73 @@ +namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics; + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.AcceptanceTests.Core.OpenTelemetry; +using NServiceBus.AcceptanceTests.EndpointTemplates; +using NServiceBus.Extensibility; +using NServiceBus.Features; +using NUnit.Framework; +using Conventions = AcceptanceTesting.Customization.Conventions; + +public class When_envelope_handler_fails : OpenTelemetryAcceptanceTest +{ + [Test] + public async Task Should_report_failed_unwrapping_metric() + { + using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); + + _ = await Scenario.Define() + .WithEndpoint(b => b.CustomConfig(x => + { + x.MakeInstanceUniquelyAddressable("discriminator"); + x.EnableFeature(); + }) + .When(session => session.SendLocal(new OutgoingMessage()))) + .Run(); + + metricsListener.AssertMetric("nservicebus.envelope.unwrapped", 1); + + metricsListener.AssertTags("nservicebus.envelope.unwrapped", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), + ["nservicebus.discriminator"] = "discriminator", + ["nservicebus.envelope.unwrapper_type"] = typeof(ThrowingHandler).FullName, + ["error.type"] = typeof(InvalidOperationException).FullName + }); + } + + class ThrowingHandler : IEnvelopeHandler + { + public Dictionary UnwrapEnvelope(string nativeMessageId, IDictionary incomingHeaders, ReadOnlySpan incomingBody, ContextBag extensions, IBufferWriter bodyWriter) + => throw new InvalidOperationException("Some exception"); + } + + class TestEnvelopeFeature : Feature + { + protected override void Setup(FeatureConfigurationContext context) => context.AddEnvelopeHandler(); + } + + class Context : ScenarioContext; + + class EndpointWithMetrics : EndpointConfigurationBuilder + { + public EndpointWithMetrics() => EndpointSetup(); + + public class MessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(OutgoingMessage message, IMessageHandlerContext context) + { + testContext.MarkAsCompleted(); + return Task.CompletedTask; + } + } + } + + public class OutgoingMessage : IMessage; +} \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_envelope_handler_succeeds.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_envelope_handler_succeeds.cs new file mode 100644 index 00000000000..682792f45c1 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_envelope_handler_succeeds.cs @@ -0,0 +1,76 @@ +namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics; + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.AcceptanceTests.Core.OpenTelemetry; +using NServiceBus.AcceptanceTests.EndpointTemplates; +using NServiceBus.Extensibility; +using NServiceBus.Features; +using NUnit.Framework; +using Conventions = AcceptanceTesting.Customization.Conventions; + +public class When_envelope_handler_succeeds : OpenTelemetryAcceptanceTest +{ + [Test] + public async Task Should_report_successful_unwrapping_metric() + { + using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener(); + + _ = await Scenario.Define() + .WithEndpoint(b => b.CustomConfig(x => + { + x.MakeInstanceUniquelyAddressable("discriminator"); + x.EnableFeature(); + }) + .When(session => session.SendLocal(new OutgoingMessage()))) + .Run(); + + // The metric should be explicitly emitted with a value of 0 to indicate no errors occurred + Assert.That(metricsListener.ReportedMeters["nservicebus.envelope.unwrapped"], Is.EqualTo(0)); + + metricsListener.AssertTags("nservicebus.envelope.unwrapped", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), + ["nservicebus.discriminator"] = "discriminator", + ["nservicebus.envelope.unwrapper_type"] = typeof(SuccessfulCloudEventHandler).FullName + }); + } + + class SuccessfulCloudEventHandler : IEnvelopeHandler + { + public Dictionary UnwrapEnvelope(string nativeMessageId, IDictionary incomingHeaders, ReadOnlySpan incomingBody, ContextBag extensions, IBufferWriter bodyWriter) + { + bodyWriter.Write(incomingBody); + return incomingHeaders.ToDictionary(); + } + } + + class TestEnvelopeFeature : Feature + { + protected override void Setup(FeatureConfigurationContext context) => context.AddEnvelopeHandler(); + } + + class Context : ScenarioContext; + + class EndpointWithMetrics : EndpointConfigurationBuilder + { + public EndpointWithMetrics() => EndpointSetup(); + + public class MessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(OutgoingMessage message, IMessageHandlerContext context) + { + testContext.MarkAsCompleted(); + return Task.CompletedTask; + } + } + } + + public class OutgoingMessage : IMessage; +} \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled.cs index 1d3490c2f97..65835511ba0 100644 --- a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled.cs +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/When_incoming_message_handled.cs @@ -81,7 +81,6 @@ static async Task WhenMessagesHandled(Func mess } } })) - .Done(c => c.TotalHandledMessages == 5) .Run(); return metricsListener; } @@ -120,7 +119,7 @@ class MyMessageHandler(Context testContext) : IHandleMessages { public Task Handle(MyMessage message, IMessageHandlerContext context) { - Interlocked.Increment(ref testContext.TotalHandledMessages); + testContext.MarkAsCompleted(Interlocked.Increment(ref testContext.TotalHandledMessages) == 5); return Task.CompletedTask; } } @@ -129,7 +128,8 @@ class MyExceptionalHandler(Context testContext) : IHandleMessages c.TotalHandledMessages == 5) .Run(); string handlingTime = "nservicebus.messaging.handler_time"; @@ -52,7 +51,8 @@ class MyMessageHandler(Context testContext) : IHandleMessages { public Task Handle(MyMessage message, IMessageHandlerContext context) { - Interlocked.Increment(ref testContext.TotalHandledMessages); + var count = Interlocked.Increment(ref testContext.TotalHandledMessages); + testContext.MarkAsCompleted(count == 5); return Task.CompletedTask; } } diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt index b9d5bad4b6e..caf4256de70 100644 --- a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt +++ b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt @@ -297,6 +297,14 @@ namespace NServiceBus { public static NServiceBus.IStartableEndpointWithExternallyManagedContainer Create(NServiceBus.EndpointConfiguration configuration, Microsoft.Extensions.DependencyInjection.IServiceCollection serviceCollection) { } } + public static class EnvelopeConfigExtensions + { + extension(NServiceBus.Features.FeatureConfigurationContext context) + { + public void AddEnvelopeHandler() + where THandler : class, NServiceBus.IEnvelopeHandler { } + } + } public static class ErrorQueueSettings { public const string SettingsKey = "errorQueue"; @@ -456,6 +464,10 @@ namespace NServiceBus { System.Threading.Tasks.Task Stop(System.Threading.CancellationToken cancellationToken = default); } + public interface IEnvelopeHandler + { + System.Collections.Generic.Dictionary? UnwrapEnvelope(string nativeMessageId, System.Collections.Generic.IDictionary incomingHeaders, System.ReadOnlySpan incomingBody, NServiceBus.Extensibility.ContextBag extensions, System.Buffers.IBufferWriter bodyWriter); + } public interface IHandleMessages { } public interface IHandleMessages : NServiceBus.IHandleMessages { diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt index a78c375d8c7..a572d43098f 100644 --- a/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt +++ b/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt @@ -6,12 +6,14 @@ "error.type", "execution.result", "nservicebus.discriminator", + "nservicebus.envelope.unwrapper_type", "nservicebus.message_handler_type", "nservicebus.message_handler_types", "nservicebus.message_type", "nservicebus.queue" ], "Metrics": [ + "nservicebus.envelope.unwrapped => Counter", "nservicebus.messaging.critical_time => Histogram, Unit: s", "nservicebus.messaging.failures => Counter", "nservicebus.messaging.fetches => Counter", diff --git a/src/NServiceBus.Core.Tests/Envelopes/EnvelopeUnwrapperTests.cs b/src/NServiceBus.Core.Tests/Envelopes/EnvelopeUnwrapperTests.cs new file mode 100644 index 00000000000..1f2460a8b9c --- /dev/null +++ b/src/NServiceBus.Core.Tests/Envelopes/EnvelopeUnwrapperTests.cs @@ -0,0 +1,195 @@ +namespace NServiceBus.Core.Tests.Envelopes; + +using OpenTelemetry; +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using Extensibility; +using NUnit.Framework; +using Transport; + + +public class EnvelopeUnwrapperTests +{ + string nativeId; + Dictionary originalHeaders; + ReadOnlyMemory originalBody; + MessageContext messageContext; + TestMeterFactory meterFactory; + IncomingPipelineMetrics incomingPipelineMetrics; + List envelopeHandlers; + + [SetUp] + public void Setup() + { + nativeId = "native-1"; + originalHeaders = new() + { + ["HeaderA"] = "ValueA" + }; + originalBody = "payload"u8.ToArray().AsMemory(); + messageContext = new MessageContext(nativeId, originalHeaders, originalBody, new TransportTransaction(), "receiveAddress", new ContextBag()); + meterFactory = new TestMeterFactory(); + incomingPipelineMetrics = new IncomingPipelineMetrics(meterFactory, "queue", "disc"); + } + + [TearDown] + public void TearDown() => meterFactory.Dispose(); + + [Test] + public void ReturnsDefaultIncomingMessageWhenNoHandlers() + { + envelopeHandlers = []; + + IncomingMessage result = RunTest(); + + Assert.That(result.NativeMessageId, Is.EqualTo(nativeId)); + Assert.That(result.Headers, Is.EqualTo(originalHeaders)); + Assert.That(result.Body, Is.EqualTo(originalBody)); + } + + [Test] + public void ReturnsDefaultIncomingMessageWhenHandlersReturnNull() + { + envelopeHandlers = [ + new NullReturningHandler(), + new NullReturningHandler(), + ]; + + IncomingMessage result = RunTest(); + + Assert.That(result.NativeMessageId, Is.EqualTo(nativeId)); + Assert.That(result.Headers, Is.EqualTo(originalHeaders)); + Assert.That(result.Body, Is.EqualTo(originalBody)); + } + + [Test] + public void ReturnsDefaultIncomingMessageWhenHandlersThrow() + { + envelopeHandlers = [ + new ThrowingHandler(), + new ThrowingHandler(), + ]; + + IncomingMessage result = RunTest(); + + Assert.That(result.NativeMessageId, Is.EqualTo(nativeId)); + Assert.That(result.Headers, Is.EqualTo(originalHeaders)); + Assert.That(result.Body, Is.EqualTo(originalBody)); + } + + [Test] + public void ReturnsValueFromTheFirstSucceedingHandler() + { + var firstHeaders = new Dictionary + { + ["HeaderB"] = "ValueB" + }; + var firstBody = new ReadOnlyMemory("firstPayload"u8.ToArray()); + + var secondHeaders = new Dictionary + { + ["HeaderC"] = "ValueC" + }; + var secondBody = new ReadOnlyMemory("secondPayload"u8.ToArray()); + envelopeHandlers = [ + new NullReturningHandler(), + new ThrowingHandler(), + new ReturningHandler(firstHeaders, firstBody), + new NullReturningHandler(), + new ThrowingHandler(), + new ReturningHandler(secondHeaders, secondBody) + ]; + + IncomingMessage result = RunTest(); + + Assert.That(result.NativeMessageId, Is.EqualTo(nativeId)); + Assert.That(result.Headers, Is.EqualTo(firstHeaders)); + Assert.That(result.Body.Span.SequenceEqual(firstBody.Span), Is.True); + } + + [Test] + public void ModifiedBodyWriterIsReset() + { + var firstHeaders = new Dictionary + { + ["HeaderB"] = "ValueB" + }; + var firstBody = new ReadOnlyMemory("firstPayload"u8.ToArray()); + + envelopeHandlers = [ + new NullReturningHandler(), + new ThrowingHandler(), + new ModifiedBodyWriterResetHandler(), + new ReturningHandler(firstHeaders, firstBody), + ]; + + IncomingMessage result = RunTest(); + + Assert.That(result.NativeMessageId, Is.EqualTo(nativeId)); + Assert.That(result.Headers, Is.EquivalentTo(firstHeaders)); + Assert.That(result.Body.Span.SequenceEqual(firstBody.Span), Is.True); + } + + [Test] + public void ModifiedAndThrowingBodyWriterIsReset() + { + var firstHeaders = new Dictionary + { + ["HeaderB"] = "ValueB" + }; + var firstBody = new ReadOnlyMemory("firstPayload"u8.ToArray()); + + envelopeHandlers = [ + new NullReturningHandler(), + new ModifiedAndThrowingBodyWriterResetHandler(), + new ReturningHandler(firstHeaders, firstBody), + ]; + + IncomingMessage result = RunTest(); + + Assert.That(result.NativeMessageId, Is.EqualTo(nativeId)); + Assert.That(result.Headers, Is.EquivalentTo(firstHeaders)); + Assert.That(result.Body.Span.SequenceEqual(firstBody.Span), Is.True); + } + + EnvelopeUnwrapper.IncomingMessageHandle RunTest() => new EnvelopeUnwrapper([.. envelopeHandlers], incomingPipelineMetrics).UnwrapEnvelope(messageContext); + + class ReturningHandler(Dictionary headersToReturn, ReadOnlyMemory bodyToReturn) : IEnvelopeHandler + { + public Dictionary UnwrapEnvelope(string nativeMessageId, IDictionary incomingHeaders, ReadOnlySpan incomingBody, ContextBag extensions, IBufferWriter bodyWriter) + { + bodyWriter.Write(bodyToReturn.Span); + return headersToReturn; + } + } + + class NullReturningHandler : IEnvelopeHandler + { + public Dictionary UnwrapEnvelope(string nativeMessageId, IDictionary incomingHeaders, ReadOnlySpan incomingBody, ContextBag extensions, IBufferWriter bodyWriter) => null; + } + + class ThrowingHandler : IEnvelopeHandler + { + public Dictionary UnwrapEnvelope(string nativeMessageId, IDictionary incomingHeaders, ReadOnlySpan incomingBody, ContextBag extensions, IBufferWriter bodyWriter) => throw new InvalidOperationException("Some exception"); + } + + class ModifiedBodyWriterResetHandler : IEnvelopeHandler + { + public Dictionary UnwrapEnvelope(string nativeMessageId, IDictionary incomingHeaders, ReadOnlySpan incomingBody, ContextBag extensions, IBufferWriter bodyWriter) + { + bodyWriter.Write("modifiedPayload"u8); + return null; + } + } + + class ModifiedAndThrowingBodyWriterResetHandler : IEnvelopeHandler + { + public Dictionary UnwrapEnvelope(string nativeMessageId, IDictionary incomingHeaders, ReadOnlySpan incomingBody, ContextBag extensions, IBufferWriter bodyWriter) + { + bodyWriter.Write("modifiedPayload"u8); + throw new InvalidOperationException("Some exception"); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs b/src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs index 3d1ca1fd999..f6223a25eac 100644 --- a/src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs +++ b/src/NServiceBus.Core.Tests/Pipeline/MainPipelineExecutorTests.cs @@ -123,6 +123,7 @@ static MessageContext CreateMessageContext() => static MainPipelineExecutor CreateMainPipelineExecutor(ServiceProvider serviceProvider, IPipeline receivePipeline) { + var incomingPipelineMetrics = new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc"); var executor = new MainPipelineExecutor( serviceProvider, new PipelineCache(serviceProvider, new PipelineModifications()), @@ -130,7 +131,8 @@ static MainPipelineExecutor CreateMainPipelineExecutor(ServiceProvider servicePr new Notification(), receivePipeline, new ActivityFactory(), - new IncomingPipelineMetrics(new TestMeterFactory(), "queue", "disc")); + incomingPipelineMetrics, + new EnvelopeUnwrapper([], incomingPipelineMetrics)); return executor; } diff --git a/src/NServiceBus.Core/EndpointConfiguration.cs b/src/NServiceBus.Core/EndpointConfiguration.cs index 9de41f9e9f0..74fd68dd674 100644 --- a/src/NServiceBus.Core/EndpointConfiguration.cs +++ b/src/NServiceBus.Core/EndpointConfiguration.cs @@ -38,6 +38,7 @@ public EndpointConfiguration(string endpointName) Settings.Set(new ReceiveComponent.Settings(Settings)); Settings.Set(new RecoverabilityComponent.Configuration()); Settings.Set(new ConsecutiveFailuresConfiguration()); + Settings.Set(new EnvelopeComponent.Settings()); Settings.Set(Pipeline = new PipelineSettings(Settings)); var featureSettings = new FeatureComponent.Settings(); diff --git a/src/NServiceBus.Core/EndpointCreator.cs b/src/NServiceBus.Core/EndpointCreator.cs index 80c215291ab..176ceba9a86 100644 --- a/src/NServiceBus.Core/EndpointCreator.cs +++ b/src/NServiceBus.Core/EndpointCreator.cs @@ -117,6 +117,8 @@ void Configure() sendComponent = SendComponent.Initialize(pipelineSettings, hostingConfiguration, routingComponent, messageMapper); + envelopeComponent = new EnvelopeComponent(settings.Get()); + receiveComponent = ReceiveComponent.Configure( receiveConfiguration, settings.ErrorQueueAddress(), @@ -178,6 +180,7 @@ public StartableEndpoint CreateStartableEndpoint(IServiceProvider serviceProvide return new StartableEndpoint(settings, featureComponent, + envelopeComponent, receiveComponent, transportSeam, pipelineComponent, @@ -199,4 +202,5 @@ public StartableEndpoint CreateStartableEndpoint(IServiceProvider serviceProvide readonly SettingsHolder settings; readonly HostingComponent.Configuration hostingConfiguration; readonly Conventions conventions; + EnvelopeComponent envelopeComponent; } \ No newline at end of file diff --git a/src/NServiceBus.Core/Envelopes/EnvelopeComponent.cs b/src/NServiceBus.Core/Envelopes/EnvelopeComponent.cs new file mode 100644 index 00000000000..a0962296ae9 --- /dev/null +++ b/src/NServiceBus.Core/Envelopes/EnvelopeComponent.cs @@ -0,0 +1,34 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.Extensions.DependencyInjection; + +class EnvelopeComponent(EnvelopeComponent.Settings settings) +{ + public EnvelopeUnwrapper CreateUnwrapper(IServiceProvider serviceProvider) + => new([.. settings.HandlerFactories.Select(factory => factory(serviceProvider))], serviceProvider.GetRequiredService()); + + public class Settings + { + readonly Dictionary> factories = []; + + public void AddEnvelopeHandler() where THandler : IEnvelopeHandler + { + if (factories.ContainsKey(typeof(THandler))) + { + return; + } + + // Create and cache the factory because the service provider is only available later + // using CreateInstance instead of CreateFactory because the envelope handlers are instantiated only once and + // kept alive for the lifetime of the endpoint + factories.Add(typeof(THandler), static sp => ActivatorUtilities.CreateInstance(sp)); + } + + public IReadOnlyCollection> HandlerFactories => factories.Values; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Envelopes/EnvelopeConfigExtensions.cs b/src/NServiceBus.Core/Envelopes/EnvelopeConfigExtensions.cs new file mode 100644 index 00000000000..a1a0906f4b5 --- /dev/null +++ b/src/NServiceBus.Core/Envelopes/EnvelopeConfigExtensions.cs @@ -0,0 +1,25 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using Features; + +/// +/// Provides extensions to register envelope handlers. +/// +public static class EnvelopeConfigExtensions +{ + extension(FeatureConfigurationContext context) + { + /// + /// Adds the envelope handler type. + /// + public void AddEnvelopeHandler() where THandler : class, IEnvelopeHandler + { + ArgumentNullException.ThrowIfNull(context); + + context.Settings.Get().AddEnvelopeHandler(); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Envelopes/EnvelopeUnwrapper.cs b/src/NServiceBus.Core/Envelopes/EnvelopeUnwrapper.cs new file mode 100644 index 00000000000..74e47b26544 --- /dev/null +++ b/src/NServiceBus.Core/Envelopes/EnvelopeUnwrapper.cs @@ -0,0 +1,89 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using Logging; +using Transport; + +class EnvelopeUnwrapper(IEnvelopeHandler[] envelopeHandlers, IncomingPipelineMetrics metrics) +{ + static IncomingMessage GetDefaultIncomingMessage(MessageContext messageContext) => new(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body); + + internal IncomingMessageHandle UnwrapEnvelope(MessageContext messageContext) + { + var bufferWriter = new LazyArrayPoolBufferWriter(messageContext.Body.Length); + foreach (var envelopeHandler in envelopeHandlers) + { + try + { + if (Log.IsDebugEnabled) + { + Log.DebugFormat( + "Unwrapping the current message (NativeID: {0} using {1}", messageContext.NativeMessageId, envelopeHandler.GetType().Name); + } + + var headers = envelopeHandler.UnwrapEnvelope(messageContext.NativeMessageId, messageContext.Headers, + messageContext.Body.Span, messageContext.Extensions, bufferWriter); + + metrics.EnvelopeUnwrappingSucceeded(messageContext, envelopeHandler); + + if (headers is not null) + { + if (Log.IsDebugEnabled) + { + Log.DebugFormat( + "Unwrapped the message (NativeID: {0} using {1}", messageContext.NativeMessageId, envelopeHandler.GetType().Name); + } + + return new IncomingMessageHandle(new IncomingMessage(messageContext.NativeMessageId, headers, bufferWriter.WrittenMemory), bufferWriter); + } + + if (Log.IsDebugEnabled) + { + Log.DebugFormat( + "Did not unwrap the message (NativeID: {0} using {1}", messageContext.NativeMessageId, envelopeHandler.GetType().Name); + } + } + catch (Exception e) + { + metrics.EnvelopeUnwrappingFailed(messageContext, envelopeHandler, e); + if (Log.IsWarnEnabled) + { + Log.WarnFormat( + "Unwrapper {0} failed to unwrap the message {1}: {2}", envelopeHandler, messageContext.NativeMessageId, e); + } + } + + // Always clear the buffer before trying the following unwrapper + bufferWriter.Clear(); + } + + if (Log.IsDebugEnabled) + { + if (envelopeHandlers.Length > 0) + { + Log.DebugFormat( + "No envelope handler unwrapped the current message (NativeID: {0}, assuming the default NServiceBus format", messageContext.NativeMessageId); + } + else + { + Log.DebugFormat( + "No envelope handler found for the current message (NativeID: {0}, assuming the default NServiceBus format", messageContext.NativeMessageId); + } + } + + return new IncomingMessageHandle(GetDefaultIncomingMessage(messageContext), bufferWriter); + } + + internal readonly struct IncomingMessageHandle(IncomingMessage message, IDisposable disposable) : IDisposable + { + public IncomingMessage Message { get; } = message; + + public void Dispose() => disposable.Dispose(); + + public static implicit operator IncomingMessage(IncomingMessageHandle handle) => handle.Message; + } + + static readonly ILog Log = LogManager.GetLogger(); +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Envelopes/IEnvelopeHandler.cs b/src/NServiceBus.Core/Envelopes/IEnvelopeHandler.cs new file mode 100644 index 00000000000..cf2e3f97bb9 --- /dev/null +++ b/src/NServiceBus.Core/Envelopes/IEnvelopeHandler.cs @@ -0,0 +1,25 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Buffers; +using System.Collections.Generic; +using Extensibility; + +/// +/// Handler for unwrapping incoming message envelope formats. +/// +public interface IEnvelopeHandler +{ + /// + /// Unwraps the incoming message envelope received by the transport prior to passing the incoming message to the pipeline. + /// + /// The native message id provided by the transport. This is included for reference purposes, and should be considered readonly. + /// Headers provided by the transport. + /// The raw body provided by the transport. + /// ContextBag of extension values provided by the transport. + /// The body writer to write the unwrapped body into. + /// Dictionary of headers if the message can be unwrapped. Null or exception otherwise. + Dictionary? UnwrapEnvelope(string nativeMessageId, IDictionary incomingHeaders, ReadOnlySpan incomingBody, ContextBag extensions, IBufferWriter bodyWriter); +} \ No newline at end of file diff --git a/src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs b/src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs index b62f01c4a3e..9eb8795c87f 100644 --- a/src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs +++ b/src/NServiceBus.Core/OpenTelemetry/Metrics/MeterTags.cs @@ -11,4 +11,5 @@ static class MeterTags public const string MessageHandlerType = "nservicebus.message_handler_type"; public const string ExecutionResult = "execution.result"; public const string ErrorType = "error.type"; + public const string EnvelopeUnwrapperType = "nservicebus.envelope.unwrapper_type"; } \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs b/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs index 7d29856461f..b842e912a3c 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs @@ -6,6 +6,7 @@ namespace NServiceBus; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.Metrics; +using Transport; using Pipeline; class IncomingPipelineMetrics @@ -19,6 +20,7 @@ class IncomingPipelineMetrics const string RecoverabilityImmediate = "nservicebus.recoverability.immediate"; const string RecoverabilityDelayed = "nservicebus.recoverability.delayed"; const string RecoverabilityError = "nservicebus.recoverability.error"; + const string EnvelopeUnwrapping = "nservicebus.envelope.unwrapped"; public IncomingPipelineMetrics(IMeterFactory meterFactory, string queueName, string discriminator) { @@ -41,6 +43,8 @@ public IncomingPipelineMetrics(IMeterFactory meterFactory, string queueName, str description: "Total number of delayed retries requested."); totalSentToErrorQueue = meter.CreateCounter(RecoverabilityError, description: "Total number of messages sent to the error queue."); + totalEnvelopeUnwrapping = meter.CreateCounter(EnvelopeUnwrapping, + description: "Total number of unwrapping attempts by the endpoint."); queueNameBase = queueName; endpointDiscriminator = discriminator; @@ -239,6 +243,29 @@ public void RecordSendToErrorQueue(IRecoverabilityContext recoverabilityContext) totalSentToErrorQueue.Add(1, meterTags); } + public void EnvelopeUnwrappingSucceeded(MessageContext messageContext, IEnvelopeHandler type) => RecordEnvelopeUnwrapping(messageContext, type, true, null); + public void EnvelopeUnwrappingFailed(MessageContext messageContext, IEnvelopeHandler type, Exception? exception) => RecordEnvelopeUnwrapping(messageContext, type, false, exception); + void RecordEnvelopeUnwrapping(MessageContext messageContext, IEnvelopeHandler type, bool succeeded, Exception? exception) + { + if (!totalEnvelopeUnwrapping.Enabled) + { + return; + } + + var incomingPipelineMetricTags = messageContext.Extensions.Get(); + TagList meterTags; + incomingPipelineMetricTags.ApplyTags(ref meterTags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator]); + meterTags.Add(new KeyValuePair(MeterTags.EnvelopeUnwrapperType, type.GetType().FullName)); + if (exception != null) + { + meterTags.Add(new KeyValuePair(MeterTags.ErrorType, exception.GetType().FullName)); + } + + totalEnvelopeUnwrapping.Add(succeeded ? 0 : 1, meterTags); + } + readonly Counter totalProcessedSuccessfully; readonly Counter totalFetched; readonly Counter totalFailures; @@ -248,6 +275,7 @@ public void RecordSendToErrorQueue(IRecoverabilityContext recoverabilityContext) readonly Counter totalImmediateRetries; readonly Counter totalDelayedRetries; readonly Counter totalSentToErrorQueue; + readonly Counter totalEnvelopeUnwrapping; string queueNameBase; string endpointDiscriminator; } \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs b/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs index f9e2ef5de18..07560a36c13 100644 --- a/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs +++ b/src/NServiceBus.Core/Pipeline/MainPipelineExecutor.cs @@ -16,7 +16,8 @@ class MainPipelineExecutor( INotificationSubscriptions receivePipelineNotification, IPipeline receivePipeline, IActivityFactory activityFactory, - IncomingPipelineMetrics incomingPipelineMetrics) + IncomingPipelineMetrics incomingPipelineMetrics, + EnvelopeUnwrapper envelopeUnwrapper) : IPipelineExecutor { public async Task Invoke(MessageContext messageContext, CancellationToken cancellationToken = default) @@ -31,7 +32,8 @@ public async Task Invoke(MessageContext messageContext, CancellationToken cancel var childScope = rootBuilder.CreateAsyncScope(); await using (childScope.ConfigureAwait(false)) { - var message = new IncomingMessage(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body); + using var incomingMessageHandle = envelopeUnwrapper.UnwrapEnvelope(messageContext); + IncomingMessage message = incomingMessageHandle; var transportReceiveContext = new TransportReceiveContext( childScope.ServiceProvider, diff --git a/src/NServiceBus.Core/Receiving/ReceiveComponent.cs b/src/NServiceBus.Core/Receiving/ReceiveComponent.cs index 9788c3deac7..51db4ebc0ba 100644 --- a/src/NServiceBus.Core/Receiving/ReceiveComponent.cs +++ b/src/NServiceBus.Core/Receiving/ReceiveComponent.cs @@ -135,6 +135,7 @@ public static ReceiveComponent Configure( public async Task Initialize( IServiceProvider builder, RecoverabilityComponent recoverabilityComponent, + EnvelopeComponent envelopeComponent, MessageOperations messageOperations, PipelineComponent pipelineComponent, IPipelineCache pipelineCache, @@ -152,7 +153,8 @@ public async Task Initialize( var receivePipeline = pipelineComponent.CreatePipeline(builder); var pipelineMetrics = builder.GetRequiredService(); - var mainPipelineExecutor = new MainPipelineExecutor(builder, pipelineCache, messageOperations, configuration.PipelineCompletedSubscribers, receivePipeline, activityFactory, pipelineMetrics); + var envelopeUnwrapper = envelopeComponent.CreateUnwrapper(builder); + var mainPipelineExecutor = new MainPipelineExecutor(builder, pipelineCache, messageOperations, configuration.PipelineCompletedSubscribers, receivePipeline, activityFactory, pipelineMetrics, envelopeUnwrapper); var recoverabilityPipelineExecutor = recoverabilityComponent.CreateRecoverabilityPipelineExecutor( builder, diff --git a/src/NServiceBus.Core/StartableEndpoint.cs b/src/NServiceBus.Core/StartableEndpoint.cs index 7e331f32270..36a414b47e5 100644 --- a/src/NServiceBus.Core/StartableEndpoint.cs +++ b/src/NServiceBus.Core/StartableEndpoint.cs @@ -12,6 +12,7 @@ namespace NServiceBus; class StartableEndpoint( SettingsHolder settings, FeatureComponent featureComponent, + EnvelopeComponent envelopeComponent, ReceiveComponent receiveComponent, TransportSeam transportSeam, PipelineComponent pipelineComponent, @@ -35,7 +36,7 @@ public async Task Setup(CancellationToken cancellationToken = default) var consecutiveFailuresConfig = settings.Get(); - await receiveComponent.Initialize(serviceProvider, recoverabilityComponent, messageOperations, pipelineComponent, pipelineCache, transportInfrastructure, consecutiveFailuresConfig, cancellationToken).ConfigureAwait(false); + await receiveComponent.Initialize(serviceProvider, recoverabilityComponent, envelopeComponent, messageOperations, pipelineComponent, pipelineCache, transportInfrastructure, consecutiveFailuresConfig, cancellationToken).ConfigureAwait(false); AddSendingQueueManifest(); } diff --git a/src/NServiceBus.Core/Utils/ArrayPoolBufferWriter.cs b/src/NServiceBus.Core/Utils/ArrayPoolBufferWriter.cs new file mode 100644 index 00000000000..3f83dfc16e2 --- /dev/null +++ b/src/NServiceBus.Core/Utils/ArrayPoolBufferWriter.cs @@ -0,0 +1,199 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Buffers; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.InteropServices; + +sealed class ArrayPoolBufferWriter( + ArrayPool pool, + int initialCapacity = ArrayPoolBufferWriter.DefaultInitialBufferSize) + : IBufferWriter, IMemoryOwner +{ + const int DefaultInitialBufferSize = 256; + + T[]? buffer = pool.Rent(initialCapacity); + + public ArrayPoolBufferWriter() + : this(ArrayPool.Shared) + { + } + + public ArrayPoolBufferWriter(int initialCapacity) + : this(ArrayPool.Shared, initialCapacity) + { + } + + public ReadOnlyMemory WrittenMemory + { + get + { + T[]? array = buffer; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array.AsMemory(0, WrittenCount); + } + } + + public ReadOnlySpan WrittenSpan + { + get + { + T[]? array = buffer; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array.AsSpan(0, WrittenCount); + } + } + + public int WrittenCount + { + get; + private set; + } + + public int Capacity + { + get + { + T[]? array = buffer; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array.Length; + } + } + + public int FreeCapacity + { + get + { + T[]? array = buffer; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array.Length - WrittenCount; + } + } + + public void Advance(int count) + { + T[]? array = buffer; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + ArgumentOutOfRangeException.ThrowIfNegative(count); + + if (WrittenCount > array.Length - count) + { + ThrowArgumentExceptionForAdvancedTooFar(); + } + + WrittenCount += count; + } + + public Memory GetMemory(int sizeHint = 0) + { + CheckBufferAndEnsureCapacity(sizeHint); + + return buffer.AsMemory(WrittenCount); + } + + public Span GetSpan(int sizeHint = 0) + { + CheckBufferAndEnsureCapacity(sizeHint); + + return buffer.AsSpan(WrittenCount); + } + + + Memory IMemoryOwner.Memory => MemoryMarshal.AsMemory(WrittenMemory); + + public void Dispose() + { + T[]? array = buffer; + + if (array is null) + { + return; + } + + buffer = null; + + pool.Return(array); + } + + public void Clear() + { + T[]? array = buffer; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + array.AsSpan(0, WrittenCount).Clear(); + + WrittenCount = 0; + } + + void CheckBufferAndEnsureCapacity(int sizeHint) + { + T[]? array = buffer; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + ArgumentOutOfRangeException.ThrowIfNegative(sizeHint); + + if (sizeHint == 0) + { + sizeHint = 1; + } + + if (sizeHint > array!.Length - WrittenCount) + { + ResizeBuffer(sizeHint); + } + } + + void ResizeBuffer(int sizeHint) + { + int minimumSize = WrittenCount + sizeHint; + + var oldArray = buffer; + buffer = pool.Rent(minimumSize); + oldArray.CopyTo(buffer); + if (oldArray is not null) + { + pool.Return(oldArray); + } + } + + [DoesNotReturn] + static void ThrowArgumentExceptionForAdvancedTooFar() => throw new ArgumentException("The buffer writer has advanced too far"); + + [DoesNotReturn] + static void ThrowObjectDisposedException() => throw new ObjectDisposedException("The current buffer has already been disposed"); +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Utils/LazyArrayPoolBufferWriter.cs b/src/NServiceBus.Core/Utils/LazyArrayPoolBufferWriter.cs new file mode 100644 index 00000000000..8cd0715741f --- /dev/null +++ b/src/NServiceBus.Core/Utils/LazyArrayPoolBufferWriter.cs @@ -0,0 +1,62 @@ +#nullable enable +namespace NServiceBus; + +using System; +using System.Buffers; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.InteropServices; + +sealed class LazyArrayPoolBufferWriter(int? capacity = null) : IBufferWriter, IMemoryOwner +{ + ArrayPoolBufferWriter? innerWriter; + + public ReadOnlyMemory WrittenMemory => innerWriter?.WrittenMemory ?? ReadOnlyMemory.Empty; + + public ReadOnlySpan WrittenSpan => innerWriter is not null ? innerWriter.WrittenSpan : []; + + public int WrittenCount => innerWriter?.WrittenCount ?? 0; + + public int Capacity + { + get + { + EnsureInnerWriter(); + return innerWriter.Capacity; + } + } + + public int FreeCapacity + { + get + { + EnsureInnerWriter(); + return innerWriter.FreeCapacity; + } + } + + public void Advance(int count) + { + EnsureInnerWriter(); + innerWriter.Advance(count); + } + + public Memory GetMemory(int sizeHint = 0) + { + EnsureInnerWriter(); + return innerWriter.GetMemory(sizeHint); + } + + public Span GetSpan(int sizeHint = 0) + { + EnsureInnerWriter(); + return innerWriter.GetSpan(sizeHint); + } + + public void Clear() => innerWriter?.Clear(); + public void Dispose() => innerWriter?.Dispose(); + + Memory IMemoryOwner.Memory => MemoryMarshal.AsMemory(WrittenMemory); + + [MemberNotNull(nameof(innerWriter))] + void EnsureInnerWriter() => innerWriter ??= capacity is null ? new ArrayPoolBufferWriter() : new ArrayPoolBufferWriter(capacity.Value); +} \ No newline at end of file