Skip to content

Commit 3d56187

Browse files
fix: address adversarial review findings in manual instrumentation guide
- Add Agent365.Observability.OtelWrite auth scope requirement - Add agent-ID-must-match-token constraint documentation - Add output_messages operation type and span section - Add server.port and gen_ai.output.messages to attribute tables - Fix max payload: document 1MB server limit (900KB SDK buffer) - Add payload chunking helper and span truncation guidance - Fix token resolver signature to str | None, handle None case - Add links mapping to DIY exporter (was hardcoded None) - Add _chunk_by_size method to exporter for large batches Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent c82a5c8 commit 3d56187

1 file changed

Lines changed: 131 additions & 24 deletions

File tree

docs/manual-a365-span-instrumentation.md

Lines changed: 131 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ Only spans with one of these values pass the backend's ingest filter:
4242
| `TextCompletion` | Inference (text completion) |
4343
| `GenerateContent` | Inference (content generation) |
4444
| `execute_tool` | Tool execution |
45+
| `output_messages` | Output message recording (agent response to user) |
4546

4647
### `invoke_agent` span
4748

@@ -69,6 +70,8 @@ The top-level span representing one user turn / agent invocation.
6970
| Optional | `microsoft.channel.name` | Channel (e.g. `"Teams"`, `"Webchat"`) | |
7071
| Optional | `microsoft.channel.link` | Channel URL | |
7172
| Optional | `gen_ai.input.messages` | JSON-serialized input messages | Can be large; may be truncated |
73+
| Optional | `gen_ai.output.messages` | JSON-serialized output messages | Agent's response; may be truncated |
74+
| Optional | `server.port` | Server port number | Omit if 443 |
7275
| Optional | `microsoft.a365.caller.agent.name` | Calling agent name | For agent-to-agent calls |
7376
| Optional | `microsoft.a365.caller.agent.id` | Calling agent GUID | For agent-to-agent calls |
7477
| Optional | `microsoft.a365.caller.agent.blueprint.id` | Calling agent blueprint | For agent-to-agent calls |
@@ -111,6 +114,20 @@ Child of `invoke_agent`. One per tool invocation.
111114
| Optional | `gen_ai.tool.type` | `"function"` | |
112115
| Optional | `gen_ai.tool.description` | Tool description | |
113116
| Optional | `server.address` | Server hostname | |
117+
| Optional | `server.port` | Server port number | Omit if 443 |
118+
119+
### `output_messages` span
120+
121+
Child of `invoke_agent`. Records the agent's final response to the user.
122+
123+
| Tier | Attribute | Expected value | Notes |
124+
|------|-----------|----------------|-------|
125+
| **Required** | `gen_ai.operation.name` | `"output_messages"` | Must match exactly |
126+
| **Required** | `microsoft.tenant.id` | Tenant GUID | Same as parent |
127+
| **Required** | `gen_ai.agent.id` | Agent GUID | Same as parent |
128+
| Recommended | `gen_ai.output.messages` | JSON-serialized output messages | The agent's response |
129+
| Recommended | `gen_ai.conversation.id` | Conversation identifier | |
130+
| Optional | `gen_ai.agent.name` | Agent name | Same as parent |
114131

115132
### Resource attributes
116133

@@ -308,15 +325,19 @@ Authorization: Bearer <token>
308325
Content-Type: application/json
309326
```
310327

328+
The token must be issued for an app registration that has the **`Agent365.Observability.OtelWrite`** application role (scope). Without this role, the backend returns `403 Forbidden`.
329+
330+
> **Important:** The `gen_ai.agent.id` value in your span attributes **must match** the application identity in the Bearer token. The backend validates that the agent ID in the payload corresponds to the authenticated app. Mismatches result in `403 Forbidden`.
331+
311332
The token is obtained from a **token resolver** — a function with signature:
312333

313334
```python
314-
def resolve_token(agent_id: str, tenant_id: str) -> str:
315-
"""Return a valid Bearer token for the given agent and tenant."""
335+
def resolve_token(agent_id: str, tenant_id: str) -> str | None:
336+
"""Return a valid Bearer token for the given agent and tenant, or None if unavailable."""
316337
...
317338
```
318339

319-
How you implement this depends on your environment (MSAL client credentials, managed identity, etc.). The A365 SDK uses this same interface internally.
340+
If the token resolver returns `None`, the exporter should skip that batch and log a warning. How you implement this depends on your environment (MSAL client credentials, managed identity, etc.). The A365 SDK uses this same interface internally.
320341

321342
### Payload format
322343

@@ -396,12 +417,43 @@ The body is JSON with this structure:
396417

397418
| Constraint | Value | Behavior |
398419
|------------|-------|----------|
399-
| Max payload size | ~900,000 bytes | Split spans across multiple POST requests |
400-
| Max individual span | 250,000 bytes | Largest attributes are replaced with `"TRUNCATED"` |
420+
| Max payload size (server limit) | 1,000,000 bytes | Requests exceeding 1 MB are rejected |
421+
| Recommended max payload | ~900,000 bytes | Use as conservative buffer below the 1 MB limit |
422+
| Max individual span | 250,000 bytes | Truncate largest attributes (see below) |
401423
| Retry on | 408, 429, 5xx | Exponential backoff; respect `Retry-After` header for 429 |
402424
| Fail on | Other 4xx | Non-retryable; check auth and payload format |
403425
| Timeout | 30 seconds | Per-request HTTP timeout |
404426

427+
#### Payload chunking
428+
429+
If a serialized batch exceeds ~900,000 bytes, split it into multiple POST requests. Each request must still respect the grouping requirement (same tenant + agent). A simple approach:
430+
431+
```python
432+
def chunk_spans(spans: list[dict], max_bytes: int = 900_000) -> list[list[dict]]:
433+
"""Split serialized spans into chunks that fit within the payload limit."""
434+
chunks = []
435+
current_chunk = []
436+
current_size = 0
437+
overhead = 200 # approximate envelope overhead
438+
439+
for span in spans:
440+
span_size = len(json.dumps(span, separators=(",", ":"), ensure_ascii=False).encode())
441+
if current_chunk and current_size + span_size + overhead > max_bytes:
442+
chunks.append(current_chunk)
443+
current_chunk = []
444+
current_size = 0
445+
current_chunk.append(span)
446+
current_size += span_size
447+
448+
if current_chunk:
449+
chunks.append(current_chunk)
450+
return chunks
451+
```
452+
453+
#### Span truncation
454+
455+
If a single span exceeds 250,000 bytes (typically due to large `gen_ai.input.messages` or `gen_ai.output.messages`), truncate the largest attribute values by replacing them with `"TRUNCATED"`. Prioritize keeping structural attributes intact and truncating message content first.
456+
405457
### Grouping requirement
406458

407459
All spans in a single POST must share the same `microsoft.tenant.id` and `gen_ai.agent.id`. If your batch contains spans for multiple tenants or agents, partition them into separate requests.
@@ -425,10 +477,12 @@ logger = logging.getLogger(__name__)
425477

426478
# Accepted operation names — spans with other values are filtered out
427479
ACCEPTED_OPERATIONS = frozenset({
428-
"invoke_agent", "execute_tool", "chat", "Chat", "TextCompletion", "GenerateContent",
480+
"invoke_agent", "execute_tool", "output_messages",
481+
"chat", "Chat", "TextCompletion", "GenerateContent",
429482
})
430483

431484
A365_ENDPOINT = "https://agent365.svc.cloud.microsoft"
485+
MAX_PAYLOAD_BYTES = 900_000
432486
MAX_RETRIES = 3
433487
HTTP_TIMEOUT = 30.0
434488

@@ -439,7 +493,7 @@ class Agent365ManualExporter(SpanExporter):
439493
def __init__(self, token_resolver):
440494
"""
441495
Args:
442-
token_resolver: Callable(agent_id, tenant_id) -> bearer_token string.
496+
token_resolver: Callable(agent_id, tenant_id) -> bearer_token string or None.
443497
"""
444498
self._token_resolver = token_resolver
445499
self._session = requests.Session()
@@ -456,8 +510,6 @@ class Agent365ManualExporter(SpanExporter):
456510
f"{A365_ENDPOINT}/observability/tenants/{tenant_id}"
457511
f"/otlp/agents/{agent_id}/traces?api-version=1"
458512
)
459-
payload = self._build_payload(group_spans)
460-
body = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
461513

462514
# Resolve auth token
463515
try:
@@ -467,13 +519,28 @@ class Agent365ManualExporter(SpanExporter):
467519
any_failure = True
468520
continue
469521

522+
if token is None:
523+
logger.warning(
524+
f"Token resolver returned None for agent={agent_id}, "
525+
f"tenant={tenant_id}; skipping batch"
526+
)
527+
any_failure = True
528+
continue
529+
470530
headers = {
471531
"content-type": "application/json",
472532
"authorization": f"Bearer {token}",
473533
}
474534

475-
if not self._post_with_retries(url, body, headers):
476-
any_failure = True
535+
# Build payload and chunk if necessary
536+
mapped_spans = [self._map_span(sp) for sp in group_spans]
537+
chunks = self._chunk_by_size(mapped_spans)
538+
539+
for chunk in chunks:
540+
payload = self._build_payload_from_mapped(group_spans[0], chunk)
541+
body = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
542+
if not self._post_with_retries(url, body, headers):
543+
any_failure = True
477544

478545
return SpanExportResult.FAILURE if any_failure else SpanExportResult.SUCCESS
479546

@@ -499,20 +566,22 @@ class Agent365ManualExporter(SpanExporter):
499566

500567
def _build_payload(self, spans: Sequence[ReadableSpan]) -> dict:
501568
"""Build the OTLP-like JSON envelope."""
502-
# Get resource attributes from the first span
569+
mapped = [self._map_span(sp) for sp in spans]
570+
return self._build_payload_from_mapped(spans[0], mapped)
571+
572+
def _build_payload_from_mapped(
573+
self, reference_span: ReadableSpan, mapped_spans: list[dict]
574+
) -> dict:
575+
"""Build the OTLP-like JSON envelope from pre-mapped span dicts."""
503576
resource_attrs = {}
504-
if spans and spans[0].resource:
505-
resource_attrs = dict(spans[0].resource.attributes)
577+
if reference_span.resource:
578+
resource_attrs = dict(reference_span.resource.attributes)
506579

507580
# Group spans by instrumentation scope
508581
scope_map: dict[tuple[str, str | None], list[dict]] = {}
509-
for sp in spans:
510-
scope = sp.instrumentation_scope
511-
scope_name = scope.name if scope else "unknown"
512-
scope_version = scope.version if scope else None
513-
scope_map.setdefault((scope_name, scope_version), []).append(
514-
self._map_span(sp)
515-
)
582+
for sp_dict in mapped_spans:
583+
# Use a default scope since mapped dicts don't carry scope info
584+
scope_map.setdefault(("manual", None), []).append(sp_dict)
516585

517586
scope_spans = [
518587
{"scope": {"name": name, "version": version}, "spans": mapped}
@@ -528,6 +597,31 @@ class Agent365ManualExporter(SpanExporter):
528597
]
529598
}
530599

600+
@staticmethod
601+
def _chunk_by_size(
602+
mapped_spans: list[dict], max_bytes: int = MAX_PAYLOAD_BYTES
603+
) -> list[list[dict]]:
604+
"""Split mapped spans into chunks that fit within the payload limit."""
605+
chunks: list[list[dict]] = []
606+
current_chunk: list[dict] = []
607+
current_size = 0
608+
overhead = 200 # approximate envelope overhead
609+
610+
for span in mapped_spans:
611+
span_size = len(
612+
json.dumps(span, separators=(",", ":"), ensure_ascii=False).encode()
613+
)
614+
if current_chunk and current_size + span_size + overhead > max_bytes:
615+
chunks.append(current_chunk)
616+
current_chunk = []
617+
current_size = 0
618+
current_chunk.append(span)
619+
current_size += span_size
620+
621+
if current_chunk:
622+
chunks.append(current_chunk)
623+
return chunks if chunks else [[]]
624+
531625
@staticmethod
532626
def _map_span(sp: ReadableSpan) -> dict:
533627
"""Convert a ReadableSpan to the A365 JSON format."""
@@ -552,6 +646,18 @@ class Agent365ManualExporter(SpanExporter):
552646
for ev in sp.events
553647
]
554648

649+
# Map links
650+
links = None
651+
if sp.links:
652+
links = [
653+
{
654+
"traceId": f"{link.context.trace_id:032x}",
655+
"spanId": f"{link.context.span_id:016x}",
656+
"attributes": dict(link.attributes) if link.attributes else None,
657+
}
658+
for link in sp.links
659+
]
660+
555661
# Map status
556662
status_code = sp.status.status_code if sp.status else StatusCode.UNSET
557663
status = {
@@ -569,7 +675,7 @@ class Agent365ManualExporter(SpanExporter):
569675
"endTimeUnixNano": sp.end_time,
570676
"attributes": attrs or None,
571677
"events": events,
572-
"links": None,
678+
"links": links,
573679
"status": status,
574680
}
575681

@@ -607,8 +713,9 @@ class Agent365ManualExporter(SpanExporter):
607713
```python
608714
from opentelemetry.sdk.trace.export import BatchSpanProcessor
609715

610-
def my_token_resolver(agent_id: str, tenant_id: str) -> str:
716+
def my_token_resolver(agent_id: str, tenant_id: str) -> str | None:
611717
# Your token acquisition logic here (MSAL, managed identity, etc.)
718+
# Return None if token cannot be acquired
612719
return "your-bearer-token"
613720

614721
exporter = Agent365ManualExporter(token_resolver=my_token_resolver)
@@ -648,7 +755,7 @@ PROVIDER_NAME = "azure"
648755
SERVER_ADDRESS = "my-resource.openai.azure.com"
649756

650757

651-
def my_token_resolver(agent_id: str, tenant_id: str) -> str:
758+
def my_token_resolver(agent_id: str, tenant_id: str) -> str | None:
652759
"""Replace with your actual token acquisition logic."""
653760
raise NotImplementedError("Implement your token resolver")
654761

0 commit comments

Comments
 (0)