Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
007720a
Introducing API for envelope handlers
afprtclr Dec 12, 2025
795c96a
Removing CanUnwrap API
afprtclr Dec 15, 2025
f7fc349
Tweak "no envelope handler found" message
hazel-bohon Dec 15, 2025
4dad761
Fix formatting
hazel-bohon Dec 15, 2025
9a29660
Adding metrics to envelopes
afprtclr Dec 18, 2025
0c5826d
Adding logs and metric tags
afprtclr Dec 18, 2025
307622b
Removing redundant project entry
afprtclr Dec 18, 2025
d491d83
Remove comment and trailing space
mauroservienti Dec 18, 2025
ef460a4
Emitting metric on success
afprtclr Dec 19, 2025
e417ec8
Removing redundant comment
afprtclr Dec 19, 2025
ff0adef
Renamed test class
hazel-bohon Jan 7, 2026
d9c0408
Rename `SuccessfulHandler` back to `ThrowingHandler` to see if this f…
hazel-bohon Jan 7, 2026
599e2f6
Updated name again
hazel-bohon Jan 7, 2026
1ebcd6f
Small fixes
hazel-bohon Jan 7, 2026
9c5f295
Trying to fix by adding import
afprtclr Jan 8, 2026
2cbebc7
Importing explicitly
afprtclr Jan 8, 2026
8515ce5
Rebasing and fixing imports
afprtclr Jan 8, 2026
cf4937c
Honor tests conventions
mauroservienti Jan 8, 2026
e153b6b
Delete src/NServiceBus.Testing.Fakes/TestableMessageContext.cs
mauroservienti Jan 9, 2026
5aa64aa
Update to new style of acceptance tests for cloud events (#7583)
andreasohlund Jan 9, 2026
91aa40b
Use new syntax for done conditions (#7584)
andreasohlund Jan 9, 2026
0183239
Use a multiple methods to report the envelope unwrapping metric counter
mauroservienti Jan 10, 2026
ca98bad
Improve method and metric name
mauroservienti Jan 10, 2026
1d65958
Merge pull request #7587 from Particular/cloud-events-metrics-multipl…
afprtclr Jan 12, 2026
84eddf5
Switching to formatted logging
afprtclr Jan 12, 2026
143d9ed
Review suggestion for the cloud event unwrapper (#7585)
danielmarbach Jan 15, 2026
39e8cad
Bringing new test style to OTel tests
afprtclr Jan 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics;

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.Core.OpenTelemetry;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Extensibility;
using NServiceBus.Features;
using NUnit.Framework;
using Conventions = AcceptanceTesting.Customization.Conventions;

public class When_envelope_handler_fails : OpenTelemetryAcceptanceTest
{
[Test]
public async Task Should_report_failed_unwrapping_metric()
{
using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener();

_ = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithMetrics>(b => b.CustomConfig(x =>
{
x.MakeInstanceUniquelyAddressable("discriminator");
x.EnableFeature<TestEnvelopeFeature>();
})
.When(session => session.SendLocal(new OutgoingMessage())))
.Run();

metricsListener.AssertMetric("nservicebus.envelope.unwrapped", 1);

metricsListener.AssertTags("nservicebus.envelope.unwrapped",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "discriminator",
["nservicebus.envelope.unwrapper_type"] = typeof(ThrowingHandler).FullName,
["error.type"] = typeof(InvalidOperationException).FullName
});
}

class ThrowingHandler : IEnvelopeHandler
{
public Dictionary<string, string> UnwrapEnvelope(string nativeMessageId, IDictionary<string, string> incomingHeaders, ReadOnlySpan<byte> incomingBody, ContextBag extensions, IBufferWriter<byte> bodyWriter)
=> throw new InvalidOperationException("Some exception");
}

class TestEnvelopeFeature : Feature
{
protected override void Setup(FeatureConfigurationContext context) => context.AddEnvelopeHandler<ThrowingHandler>();
}

class Context : ScenarioContext;

class EndpointWithMetrics : EndpointConfigurationBuilder
{
public EndpointWithMetrics() => EndpointSetup<DefaultServer>();

public class MessageHandler(Context testContext) : IHandleMessages<OutgoingMessage>
{
public Task Handle(OutgoingMessage message, IMessageHandlerContext context)
{
testContext.MarkAsCompleted();
return Task.CompletedTask;
}
}
}

public class OutgoingMessage : IMessage;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Metrics;

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.Core.OpenTelemetry;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Extensibility;
using NServiceBus.Features;
using NUnit.Framework;
using Conventions = AcceptanceTesting.Customization.Conventions;

public class When_envelope_handler_succeeds : OpenTelemetryAcceptanceTest
{
[Test]
public async Task Should_report_successful_unwrapping_metric()
{
using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener();

_ = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithMetrics>(b => b.CustomConfig(x =>
{
x.MakeInstanceUniquelyAddressable("discriminator");
x.EnableFeature<TestEnvelopeFeature>();
})
.When(session => session.SendLocal(new OutgoingMessage())))
.Run();

// The metric should be explicitly emitted with a value of 0 to indicate no errors occurred
Assert.That(metricsListener.ReportedMeters["nservicebus.envelope.unwrapped"], Is.EqualTo(0));

metricsListener.AssertTags("nservicebus.envelope.unwrapped",
new Dictionary<string, object>
{
["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)),
["nservicebus.discriminator"] = "discriminator",
["nservicebus.envelope.unwrapper_type"] = typeof(SuccessfulCloudEventHandler).FullName
});
}

class SuccessfulCloudEventHandler : IEnvelopeHandler
{
public Dictionary<string, string> UnwrapEnvelope(string nativeMessageId, IDictionary<string, string> incomingHeaders, ReadOnlySpan<byte> incomingBody, ContextBag extensions, IBufferWriter<byte> bodyWriter)
{
bodyWriter.Write(incomingBody);
return incomingHeaders.ToDictionary();
}
}

class TestEnvelopeFeature : Feature
{
protected override void Setup(FeatureConfigurationContext context) => context.AddEnvelopeHandler<SuccessfulCloudEventHandler>();
}

class Context : ScenarioContext;

class EndpointWithMetrics : EndpointConfigurationBuilder
{
public EndpointWithMetrics() => EndpointSetup<DefaultServer>();

public class MessageHandler(Context testContext) : IHandleMessages<OutgoingMessage>
{
public Task Handle(OutgoingMessage message, IMessageHandlerContext context)
{
testContext.MarkAsCompleted();
return Task.CompletedTask;
}
}
}

public class OutgoingMessage : IMessage;
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ static async Task<TestingMetricListener> WhenMessagesHandled(Func<IMessage> mess
}
}
}))
.Done(c => c.TotalHandledMessages == 5)
.Run();
return metricsListener;
}
Expand Down Expand Up @@ -120,7 +119,7 @@ class MyMessageHandler(Context testContext) : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
Interlocked.Increment(ref testContext.TotalHandledMessages);
testContext.MarkAsCompleted(Interlocked.Increment(ref testContext.TotalHandledMessages) == 5);
return Task.CompletedTask;
}
}
Expand All @@ -129,7 +128,8 @@ class MyExceptionalHandler(Context testContext) : IHandleMessages<MyExceptionalM
{
public Task Handle(MyExceptionalMessage message, IMessageHandlerContext context)
{
Interlocked.Increment(ref testContext.TotalHandledMessages);
var count = Interlocked.Increment(ref testContext.TotalHandledMessages);
testContext.MarkAsCompleted(count == 5);
throw new Exception();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public async Task Should_record_handling_time()
await session.SendLocal(new MyMessage());
}
}))
.Done(c => c.TotalHandledMessages == 5)
.Run();

string handlingTime = "nservicebus.messaging.handler_time";
Expand All @@ -52,7 +51,8 @@ class MyMessageHandler(Context testContext) : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
Interlocked.Increment(ref testContext.TotalHandledMessages);
var count = Interlocked.Increment(ref testContext.TotalHandledMessages);
testContext.MarkAsCompleted(count == 5);
return Task.CompletedTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@ namespace NServiceBus
{
public static NServiceBus.IStartableEndpointWithExternallyManagedContainer Create(NServiceBus.EndpointConfiguration configuration, Microsoft.Extensions.DependencyInjection.IServiceCollection serviceCollection) { }
}
public static class EnvelopeConfigExtensions
{
extension(NServiceBus.Features.FeatureConfigurationContext context)
{
public void AddEnvelopeHandler<THandler>()
where THandler : class, NServiceBus.IEnvelopeHandler { }
}
}
public static class ErrorQueueSettings
{
public const string SettingsKey = "errorQueue";
Expand Down Expand Up @@ -456,6 +464,10 @@ namespace NServiceBus
{
System.Threading.Tasks.Task Stop(System.Threading.CancellationToken cancellationToken = default);
}
public interface IEnvelopeHandler
{
System.Collections.Generic.Dictionary<string, string>? UnwrapEnvelope(string nativeMessageId, System.Collections.Generic.IDictionary<string, string> incomingHeaders, System.ReadOnlySpan<byte> incomingBody, NServiceBus.Extensibility.ContextBag extensions, System.Buffers.IBufferWriter<byte> bodyWriter);
}
public interface IHandleMessages { }
public interface IHandleMessages<T> : NServiceBus.IHandleMessages
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
"error.type",
"execution.result",
"nservicebus.discriminator",
"nservicebus.envelope.unwrapper_type",
"nservicebus.message_handler_type",
"nservicebus.message_handler_types",
"nservicebus.message_type",
"nservicebus.queue"
],
"Metrics": [
"nservicebus.envelope.unwrapped => Counter",
"nservicebus.messaging.critical_time => Histogram, Unit: s",
"nservicebus.messaging.failures => Counter",
"nservicebus.messaging.fetches => Counter",
Expand Down
Loading