Skip to content

Commit 9f8ed47

Browse files
feat: Add Spectra Collector exporter integration (#197)
* docs: Add design for Spectra Collector exporter integration Design document and brainstorm outputs for adding SpectraExporterOptions to observability-core, enabling Weave/Copilot Cowork to export traces via OTLP to Spectra Collector sidecars instead of the A365 API. Key decisions: - New SpectraExporterOptions class with K8s sidecar defaults - Union type dispatch on configure() exporter_options parameter - Move suppress_invoke_agent_input to enrichment layer for exporter-agnostic suppression - Protocol validation, module-level gRPC import, export surface symmetry Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: Add SpectraExporterOptions class for Spectra Collector sidecar export Introduces SpectraExporterOptions with fields for endpoint, protocol (grpc/http), insecure flag, and batch settings. Defaults are tuned for K8s sidecar topology (localhost:4317, gRPC, insecure). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: Add excluded_attribute_keys to EnrichedReadableSpan Extends EnrichedReadableSpan with an optional excluded_attribute_keys parameter that removes specified keys after merging extras. Existing callers are unaffected (defaults to empty set). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor: Move suppress_invoke_agent_input from exporter to batch processor Moves the InvokeAgent input message suppression from _Agent365Exporter._map_span() to _EnrichingBatchSpanProcessor.on_end() so it works with any exporter (A365, Spectra, or OTLP bolt-on). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: Add Spectra exporter dispatch to configure() Adds isinstance-based dispatch in _configure_internal() to create GrpcOTLPSpanExporter or OTLPSpanExporter when SpectraExporterOptions is provided. Updates exporter_options type to union on all three configure() signatures. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: Export SpectraExporterOptions and Agent365ExporterOptions from public API Adds both exporter options classes to the core package's public API for consistent import ergonomics. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: Add Spectra exporter tests and update A365 regression test Adds test_spectra_exporter.py with 13 tests covering: - SpectraExporterOptions defaults and validation - configure() with Spectra options (gRPC, HTTP, custom endpoint) - A365 env var ignored when Spectra options provided - Batch settings extraction, OTLP bolt-on interaction - suppress_invoke_agent_input in batch processor - EnrichedReadableSpan excluded_attribute_keys Updates test_agent365.py to reflect suppress_invoke_agent_input being moved from _Agent365Exporter to _EnrichingBatchSpanProcessor. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Update prompt suppression tests for processor-based suppression Updates test_prompt_suppression.py to test suppress_invoke_agent_input on _EnrichingBatchSpanProcessor instead of _Agent365Exporter, reflecting the refactor that moved suppression to the processor layer. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Address PR review feedback from Copilot - Make default endpoint protocol-aware: 4317 for gRPC, 4318 for HTTP - Add suppress_invoke_agent_input as explicit param on public configure() - Fix insecure default from False to True in brainstorm docs (BRAINSTORM.md, TLDR.md, ARCHITECTURE.md) to match implementation - Add tests for HTTP default endpoint and explicit endpoint override Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Update remaining insecure=False reference in brainstorm changelog Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore: Remove brainstorm and design documents for Spectra exporter Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 51801b3 commit 9f8ed47

10 files changed

Lines changed: 456 additions & 50 deletions

File tree

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
)
1313
from .execute_tool_scope import ExecuteToolScope
1414
from .execution_type import ExecutionType
15+
from .exporters.agent365_exporter_options import Agent365ExporterOptions
1516
from .exporters.enriched_span import EnrichedReadableSpan
1617
from .exporters.enriching_span_processor import (
1718
get_span_enricher,
1819
register_span_enricher,
1920
unregister_span_enricher,
2021
)
22+
from .exporters.spectra_exporter_options import SpectraExporterOptions
2123
from .inference_call_details import InferenceCallDetails, ServiceEndpoint
2224
from .inference_operation_type import InferenceOperationType
2325
from .inference_scope import InferenceScope
@@ -38,6 +40,9 @@
3840
"is_configured",
3941
"get_tracer",
4042
"get_tracer_provider",
43+
# Exporter options
44+
"Agent365ExporterOptions",
45+
"SpectraExporterOptions",
4146
# Span enrichment
4247
"register_span_enricher",
4348
"unregister_span_enricher",

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/config.py

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
from typing import Any, Optional
99

1010
from opentelemetry import trace
11+
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
12+
OTLPSpanExporter as GrpcOTLPSpanExporter,
13+
)
1114
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
1215
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_NAMESPACE, Resource
1316
from opentelemetry.sdk.trace import TracerProvider
@@ -18,6 +21,7 @@
1821
from .exporters.enriching_span_processor import (
1922
_EnrichingBatchSpanProcessor,
2023
)
24+
from .exporters.spectra_exporter_options import SpectraExporterOptions
2125
from .exporters.utils import is_agent365_exporter_enabled
2226
from .trace_processor.span_processor import SpanProcessor
2327

@@ -58,7 +62,7 @@ def configure(
5862
logger_name: str = DEFAULT_LOGGER_NAME,
5963
token_resolver: Callable[[str, str], str | None] | None = None,
6064
cluster_category: str = "prod",
61-
exporter_options: Optional[Agent365ExporterOptions] = None,
65+
exporter_options: Agent365ExporterOptions | SpectraExporterOptions | None = None,
6266
suppress_invoke_agent_input: bool = False,
6367
**kwargs: Any,
6468
) -> bool:
@@ -72,9 +76,10 @@ def configure(
7276
Use exporter_options instead.
7377
:param cluster_category: (Deprecated) Environment / cluster category (e.g. "prod").
7478
Use exporter_options instead.
75-
:param exporter_options: Agent365ExporterOptions instance for configuring the exporter.
76-
If provided, exporter_options takes precedence. If exporter_options is None, the token_resolver and cluster_category parameters are used as fallback/legacy support to construct a default Agent365ExporterOptions instance.
77-
:param suppress_invoke_agent_input: If True, suppress input messages for spans that are children of InvokeAgent spans.
79+
:param exporter_options: Exporter configuration. Pass Agent365ExporterOptions for A365 API
80+
export, SpectraExporterOptions for Spectra Collector sidecar export, or None (default)
81+
to construct Agent365ExporterOptions from legacy parameters.
82+
:param suppress_invoke_agent_input: If True, suppress input messages for InvokeAgent spans.
7883
:return: True if configuration succeeded, False otherwise.
7984
"""
8085
try:
@@ -100,7 +105,7 @@ def _configure_internal(
100105
logger_name: str,
101106
token_resolver: Callable[[str, str], str | None] | None = None,
102107
cluster_category: str = "prod",
103-
exporter_options: Optional[Agent365ExporterOptions] = None,
108+
exporter_options: Agent365ExporterOptions | SpectraExporterOptions | None = None,
104109
suppress_invoke_agent_input: bool = False,
105110
**kwargs: Any,
106111
) -> bool:
@@ -156,25 +161,43 @@ def _configure_internal(
156161
"max_export_batch_size": exporter_options.max_export_batch_size,
157162
}
158163

159-
if is_agent365_exporter_enabled() and exporter_options.token_resolver is not None:
164+
# Type-based exporter dispatch
165+
if isinstance(exporter_options, SpectraExporterOptions):
166+
# Spectra path — OTLP exporter to sidecar
167+
# ENABLE_A365_OBSERVABILITY_EXPORTER is intentionally ignored.
168+
if exporter_options.protocol == "grpc":
169+
exporter = GrpcOTLPSpanExporter(
170+
endpoint=exporter_options.endpoint,
171+
insecure=exporter_options.insecure,
172+
)
173+
else:
174+
exporter = OTLPSpanExporter(
175+
endpoint=exporter_options.endpoint,
176+
)
177+
178+
elif is_agent365_exporter_enabled() and exporter_options.token_resolver is not None:
160179
exporter = _Agent365Exporter(
161180
token_resolver=exporter_options.token_resolver,
162181
cluster_category=exporter_options.cluster_category,
163182
use_s2s_endpoint=exporter_options.use_s2s_endpoint,
164-
suppress_invoke_agent_input=suppress_invoke_agent_input,
165183
)
166184

167185
else:
168186
exporter = ConsoleSpanExporter()
169187
self._logger.warning(
170-
"is_agent365_exporter_enabled() not enabled or token_resolver not set. Falling back to console exporter."
188+
"is_agent365_exporter_enabled() not enabled or token_resolver not set."
189+
" Falling back to console exporter."
171190
)
172191

173192
# Add span processors
174193

175194
# Create _EnrichingBatchSpanProcessor with optimized settings
176195
# This allows extensions to enrich spans before export
177-
batch_processor = _EnrichingBatchSpanProcessor(exporter, **batch_processor_kwargs)
196+
batch_processor = _EnrichingBatchSpanProcessor(
197+
exporter,
198+
suppress_invoke_agent_input=suppress_invoke_agent_input,
199+
**batch_processor_kwargs,
200+
)
178201
agent_processor = SpanProcessor()
179202

180203
tracer_provider.add_span_processor(batch_processor)
@@ -248,7 +271,8 @@ def configure(
248271
logger_name: str = DEFAULT_LOGGER_NAME,
249272
token_resolver: Callable[[str, str], str | None] | None = None,
250273
cluster_category: str = "prod",
251-
exporter_options: Optional[Agent365ExporterOptions] = None,
274+
exporter_options: Agent365ExporterOptions | SpectraExporterOptions | None = None,
275+
suppress_invoke_agent_input: bool = False,
252276
**kwargs: Any,
253277
) -> bool:
254278
"""
@@ -261,8 +285,10 @@ def configure(
261285
Use exporter_options instead.
262286
:param cluster_category: (Deprecated) Environment / cluster category (e.g. "prod").
263287
Use exporter_options instead.
264-
:param exporter_options: Agent365ExporterOptions instance for configuring the exporter.
265-
If provided, exporter_options takes precedence. If exporter_options is None, the token_resolver and cluster_category parameters are used as fallback/legacy support to construct a default Agent365ExporterOptions instance.
288+
:param exporter_options: Exporter configuration. Pass Agent365ExporterOptions for A365 API
289+
export, SpectraExporterOptions for Spectra Collector sidecar export, or None (default)
290+
to construct Agent365ExporterOptions from legacy parameters.
291+
:param suppress_invoke_agent_input: If True, suppress input messages for InvokeAgent spans.
266292
:return: True if configuration succeeded, False otherwise.
267293
"""
268294
return _telemetry_manager.configure(
@@ -272,6 +298,7 @@ def configure(
272298
token_resolver,
273299
cluster_category,
274300
exporter_options,
301+
suppress_invoke_agent_input,
275302
**kwargs,
276303
)
277304

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
# Licensed under the MIT License.
33

44
from .agent365_exporter_options import Agent365ExporterOptions
5+
from .spectra_exporter_options import SpectraExporterOptions
56

67
# Agent365Exporter is not exported intentionally.
78
# It should only be used internally by the observability core module.
8-
__all__ = ["Agent365ExporterOptions"]
9+
__all__ = ["Agent365ExporterOptions", "SpectraExporterOptions"]

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
1818
from opentelemetry.trace import StatusCode
1919

20-
from ..constants import (
21-
GEN_AI_INPUT_MESSAGES_KEY,
22-
GEN_AI_OPERATION_NAME_KEY,
23-
INVOKE_AGENT_OPERATION_NAME,
24-
)
2520
from .utils import (
2621
build_export_url,
2722
get_validated_domain_override,
@@ -59,7 +54,6 @@ def __init__(
5954
token_resolver: Callable[[str, str], str | None],
6055
cluster_category: str = "prod",
6156
use_s2s_endpoint: bool = False,
62-
suppress_invoke_agent_input: bool = False,
6357
):
6458
if token_resolver is None:
6559
raise ValueError("token_resolver must be provided.")
@@ -69,7 +63,6 @@ def __init__(
6963
self._token_resolver = token_resolver
7064
self._cluster_category = cluster_category
7165
self._use_s2s_endpoint = use_s2s_endpoint
72-
self._suppress_invoke_agent_input = suppress_invoke_agent_input
7366
# Read domain override once at initialization
7467
self._domain_override = get_validated_domain_override()
7568

@@ -270,19 +263,6 @@ def _map_span(self, sp: ReadableSpan) -> dict[str, Any]:
270263
# attributes
271264
attrs = dict(sp.attributes or {})
272265

273-
# Suppress input messages if configured and current span is an InvokeAgent span
274-
if self._suppress_invoke_agent_input:
275-
# Check if current span is an InvokeAgent span by:
276-
# 1. Span name starts with "invoke_agent"
277-
# 2. Has attribute gen_ai.operation.name set to INVOKE_AGENT_OPERATION_NAME
278-
operation_name = attrs.get(GEN_AI_OPERATION_NAME_KEY)
279-
if (
280-
sp.name.startswith(INVOKE_AGENT_OPERATION_NAME)
281-
and operation_name == INVOKE_AGENT_OPERATION_NAME
282-
):
283-
# Remove input messages attribute
284-
attrs.pop(GEN_AI_INPUT_MESSAGES_KEY, None)
285-
286266
# events
287267
events = []
288268
for ev in sp.events:

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/enriched_span.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,31 @@ class EnrichedReadableSpan(ReadableSpan):
1919
the original span.
2020
"""
2121

22-
def __init__(self, span: ReadableSpan, extra_attributes: dict):
22+
def __init__(
23+
self,
24+
span: ReadableSpan,
25+
extra_attributes: dict,
26+
excluded_attribute_keys: set[str] | None = None,
27+
):
2328
"""
2429
Initialize the enriched span wrapper.
2530
2631
Args:
2732
span: The original ReadableSpan to wrap.
2833
extra_attributes: Additional attributes to merge with the original.
34+
excluded_attribute_keys: Attribute keys to remove after merging.
2935
"""
3036
self._span = span
3137
self._extra_attributes = extra_attributes
38+
self._excluded_attribute_keys = excluded_attribute_keys or set()
3239

3340
@property
3441
def attributes(self) -> types.Attributes:
3542
"""Return merged attributes from original span and extra attributes."""
3643
original = dict(self._span.attributes or {})
3744
original.update(self._extra_attributes)
45+
for key in self._excluded_attribute_keys:
46+
original.pop(key, None)
3847
return original
3948

4049
@property

libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/enriching_span_processor.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@
1010
from opentelemetry.sdk.trace import ReadableSpan
1111
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1212

13+
from ..constants import (
14+
GEN_AI_INPUT_MESSAGES_KEY,
15+
GEN_AI_OPERATION_NAME_KEY,
16+
INVOKE_AGENT_OPERATION_NAME,
17+
)
18+
from .enriched_span import EnrichedReadableSpan
19+
1320
logger = logging.getLogger(__name__)
1421

1522
# Single span enricher - only one platform instrumentor should be active at a time
@@ -65,6 +72,15 @@ def get_span_enricher() -> Callable[[ReadableSpan], ReadableSpan] | None:
6572
class _EnrichingBatchSpanProcessor(BatchSpanProcessor):
6673
"""BatchSpanProcessor that applies the registered enricher before batching."""
6774

75+
def __init__(
76+
self,
77+
*args: object,
78+
suppress_invoke_agent_input: bool = False,
79+
**kwargs: object,
80+
):
81+
super().__init__(*args, **kwargs)
82+
self._suppress_invoke_agent_input = suppress_invoke_agent_input
83+
6884
def on_end(self, span: ReadableSpan) -> None:
6985
"""Apply the span enricher and pass to parent for batching.
7086
@@ -83,4 +99,18 @@ def on_end(self, span: ReadableSpan) -> None:
8399
enricher.__name__,
84100
)
85101

102+
# Apply input message suppression for InvokeAgent spans
103+
if self._suppress_invoke_agent_input:
104+
attrs = enriched_span.attributes or {}
105+
operation_name = attrs.get(GEN_AI_OPERATION_NAME_KEY)
106+
if (
107+
enriched_span.name.startswith(INVOKE_AGENT_OPERATION_NAME)
108+
and operation_name == INVOKE_AGENT_OPERATION_NAME
109+
):
110+
enriched_span = EnrichedReadableSpan(
111+
enriched_span,
112+
extra_attributes={},
113+
excluded_attribute_keys={GEN_AI_INPUT_MESSAGES_KEY},
114+
)
115+
86116
super().on_end(enriched_span)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
from typing import Literal
5+
6+
7+
class SpectraExporterOptions:
8+
"""
9+
Configuration for exporting traces to a Spectra Collector sidecar via OTLP.
10+
11+
Spectra Collector is deployed as a Kubernetes sidecar that accepts
12+
standard OTLP telemetry on localhost. Defaults are tuned for this
13+
deployment topology — most consumers should not need to override them.
14+
15+
Note: Batch processor fields (max_queue_size, scheduled_delay_ms, etc.)
16+
are duplicated from Agent365ExporterOptions intentionally — these two
17+
options classes have no shared base class per design decision C4.
18+
"""
19+
20+
_DEFAULT_GRPC_ENDPOINT = "http://localhost:4317"
21+
_DEFAULT_HTTP_ENDPOINT = "http://localhost:4318"
22+
23+
def __init__(
24+
self,
25+
endpoint: str | None = None,
26+
protocol: Literal["grpc", "http"] = "grpc",
27+
insecure: bool = True,
28+
max_queue_size: int = 2048,
29+
scheduled_delay_ms: int = 5000,
30+
exporter_timeout_ms: int = 30000,
31+
max_export_batch_size: int = 512,
32+
):
33+
"""
34+
Args:
35+
endpoint: Spectra sidecar OTLP endpoint. Defaults to
36+
http://localhost:4317 for gRPC or http://localhost:4318 for HTTP.
37+
protocol: OTLP protocol — "grpc" or "http". Default: grpc.
38+
insecure: Use insecure (no TLS) connection. Default: True (localhost sidecar).
39+
max_queue_size: Batch processor queue size. Default: 2048.
40+
scheduled_delay_ms: Export interval in milliseconds. Default: 5000.
41+
exporter_timeout_ms: Export timeout in milliseconds. Default: 30000.
42+
max_export_batch_size: Max spans per export batch. Default: 512.
43+
"""
44+
if protocol not in ("grpc", "http"):
45+
raise ValueError(f"protocol must be 'grpc' or 'http', got '{protocol}'")
46+
if endpoint is None:
47+
endpoint = (
48+
self._DEFAULT_GRPC_ENDPOINT if protocol == "grpc" else self._DEFAULT_HTTP_ENDPOINT
49+
)
50+
self.endpoint = endpoint
51+
self.protocol = protocol
52+
self.insecure = insecure
53+
self.max_queue_size = max_queue_size
54+
self.scheduled_delay_ms = scheduled_delay_ms
55+
self.exporter_timeout_ms = exporter_timeout_ms
56+
self.max_export_batch_size = max_export_batch_size

tests/observability/core/test_agent365.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@ def test_batch_span_processor_and_exporter_called_with_correct_values(
115115
self.assertTrue(result, "configure() should return True")
116116

117117
# Verify Agent365Exporter was called with correct parameters
118+
# (suppress_invoke_agent_input is now handled by _EnrichingBatchSpanProcessor)
118119
mock_exporter.assert_called_once_with(
119120
token_resolver=self.mock_token_resolver,
120121
cluster_category="staging",
121122
use_s2s_endpoint=True,
122-
suppress_invoke_agent_input=False,
123123
)
124124

125125
# Verify BatchSpanProcessor was called with correct parameters from exporter_options

0 commit comments

Comments
 (0)