From 219aa6166a7812e41d930548e7cbf4763a74ed04 Mon Sep 17 00:00:00 2001 From: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Date: Mon, 11 May 2026 16:37:26 -0700 Subject: [PATCH 1/2] Add A365-specific OpenAI Agents SDK instrumentor When enable_a365=True, the distro now uses a bundled A365OpenAIAgentsInstrumentor instead of the upstream opentelemetry-instrumentation-openai-agents-v2. This produces spans with the A365 versioned envelope format including custom.parent.span.id, per-message indexed attributes, detailed token counts, and graph_node_parent_id for handoffs. - Port trace processor, message mapper, utils, and constants from Agent365-python - Wire A365 instrumentor in _distro.py (skip upstream when A365 enabled) - Add openai-agents as optional dependency - Add unit tests (70 tests) and integration test scaffolding - Update A365_DOCUMENTATION.md, MIGRATION_A365.md, README.md, CHANGELOG.md --- A365_DOCUMENTATION.md | 15 +- CHANGELOG.md | 5 + MIGRATION_A365.md | 6 +- README.md | 4 +- pyproject.toml | 3 + src/microsoft/opentelemetry/_distro.py | 28 +- .../_genai/_openai_agents/__init__.py | 2 + .../_genai/_openai_agents/_constants.py | 35 + .../_genai/_openai_agents/_message_mapper.py | 376 +++++++++++ .../_openai_agents/_trace_instrumentor.py | 98 +++ .../_genai/_openai_agents/_trace_processor.py | 292 +++++++++ .../_genai/_openai_agents/_utils.py | 614 ++++++++++++++++++ tests/a365/test_span_processor.py | 1 + .../test_trace_instrumentor.py | 1 + tests/genai/main_agent/test_span_processor.py | 1 + tests/openai_agents/__init__.py | 2 + tests/openai_agents/integration/__init__.py | 2 + tests/openai_agents/integration/conftest.py | 53 ++ .../test_openai_agents_trace_processor.py | 383 +++++++++++ tests/openai_agents/test_message_mapper.py | 307 +++++++++ .../openai_agents/test_trace_instrumentor.py | 103 +++ tests/openai_agents/test_trace_processor.py | 293 +++++++++ tests/openai_agents/test_utils.py | 248 +++++++ .../test_trace_instrumentor.py | 1 + tests/test_distro.py | 8 +- 25 files changed, 2870 insertions(+), 11 deletions(-) create mode 100644 src/microsoft/opentelemetry/_genai/_openai_agents/__init__.py create mode 100644 src/microsoft/opentelemetry/_genai/_openai_agents/_constants.py create mode 100644 src/microsoft/opentelemetry/_genai/_openai_agents/_message_mapper.py create mode 100644 src/microsoft/opentelemetry/_genai/_openai_agents/_trace_instrumentor.py create mode 100644 src/microsoft/opentelemetry/_genai/_openai_agents/_trace_processor.py create mode 100644 src/microsoft/opentelemetry/_genai/_openai_agents/_utils.py create mode 100644 tests/openai_agents/__init__.py create mode 100644 tests/openai_agents/integration/__init__.py create mode 100644 tests/openai_agents/integration/conftest.py create mode 100644 tests/openai_agents/integration/test_openai_agents_trace_processor.py create mode 100644 tests/openai_agents/test_message_mapper.py create mode 100644 tests/openai_agents/test_trace_instrumentor.py create mode 100644 tests/openai_agents/test_trace_processor.py create mode 100644 tests/openai_agents/test_utils.py diff --git a/A365_DOCUMENTATION.md b/A365_DOCUMENTATION.md index 9eed348a..3dd5c22e 100644 --- a/A365_DOCUMENTATION.md +++ b/A365_DOCUMENTATION.md @@ -92,10 +92,23 @@ Supported instrumentors: | Library | Instrumentor | Package | |---|---|---| | Semantic Kernel | `SemanticKernelInstrumentor` | Bundled in distro | -| OpenAI Agents SDK | Via `opentelemetry-instrumentation-openai-agents-v2` | Dependency of distro | +| OpenAI Agents SDK | `A365OpenAIAgentsInstrumentor` (A365) / `opentelemetry-instrumentation-openai-agents-v2` (non-A365) | Bundled / Dependency | | Agent Framework | `AgentFrameworkInstrumentor` | Bundled in distro | | LangChain | `LangChainInstrumentor` | Bundled in distro | +### OpenAI Agents SDK — Dual Instrumentation + +The distro ships **two** instrumentors for the OpenAI Agents SDK, selected automatically based on whether A365 is enabled: + +| Mode | Instrumentor | Span format | +|---|---|---| +| `enable_a365=True` | `A365OpenAIAgentsInstrumentor` (bundled) | A365 versioned envelope with `custom.parent.span.id`, per-message indexed attributes, detailed token counts, `graph_node_parent_id` for handoffs | +| `enable_a365=False` | `opentelemetry-instrumentation-openai-agents-v2` (upstream) | Standard OpenTelemetry GenAI semantic conventions | + +The selection is automatic — no manual `instrument()` calls are needed. When `enable_a365=True`, the upstream instrumentor for `openai_agents` is **skipped** and the A365 instrumentor is used instead. All other instrumentors (LangChain, Semantic Kernel, etc.) are unaffected. + +> **Important:** When both `enable_a365=True` and `enable_azure_monitor=True` are set, Azure Monitor and OTLP exporters will see spans in the A365 format (versioned envelope) rather than upstream OTel format. This is expected — the A365 instrumentor writes to the global `TracerProvider`, so all attached exporters receive the same spans. + ### Noisy Spans — A365-Only Mode When `enable_a365=True` (and `enable_azure_monitor` is **not** set), the distro **disables web-framework and HTTP-client instrumentations by default** so only GenAI-related spans appear: diff --git a/CHANGELOG.md b/CHANGELOG.md index 33059d57..7d2afb8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Release History +## Unreleased + +### Features Added +- Add A365-specific OpenAI Agents SDK instrumentor (`A365OpenAIAgentsInstrumentor`). When `enable_a365=True`, the distro uses this bundled instrumentor instead of the upstream `opentelemetry-instrumentation-openai-agents-v2`, producing spans with the A365 versioned envelope format, `custom.parent.span.id`, per-message indexed attributes, detailed token counts, and `graph_node_parent_id` for handoffs. + ## 1.1.0 (2026-05-11) ### Features Added diff --git a/MIGRATION_A365.md b/MIGRATION_A365.md index e99c0d40..4b477f9a 100644 --- a/MIGRATION_A365.md +++ b/MIGRATION_A365.md @@ -197,7 +197,11 @@ OpenAIAgentsTraceInstrumentor().instrument() # ✅ NEW — auto-instrumented by distro, no manual setup needed # Set ENABLE_A365_OBSERVABILITY_EXPORTER=true in env use_microsoft_opentelemetry(enable_a365=True) -# OpenAI instrumentation is handled automatically +# When enable_a365=True, the distro uses a bundled A365-specific instrumentor +# (A365OpenAIAgentsInstrumentor) that produces the same A365 versioned envelope +# format as the old OpenAIAgentsTraceInstrumentor. The upstream OTel instrumentor +# (opentelemetry-instrumentation-openai-agents-v2) is skipped automatically. +# When enable_a365=False, the upstream instrumentor is used instead. ``` ### Extensions — Semantic Kernel (observability-extensions-semantic-kernel) diff --git a/README.md b/README.md index f86834b4..8692c822 100644 --- a/README.md +++ b/README.md @@ -211,12 +211,14 @@ Microsoft OpenTelemetry automatically instruments the following libraries when i | `urllib` | HTTP client | | `urllib3` | HTTP client | | `openai` | GenAI | -| `openai_agents` | GenAI | +| `openai_agents` | GenAI (see note below) | | `langchain` | GenAI | | `semantic_kernel` | GenAI | | `agent_framework` | GenAI | | `azure_sdk` | Azure (enabled when Azure Monitor is active) | +> **OpenAI Agents SDK:** The distro includes two instrumentors for `openai_agents`. When `enable_a365=True`, a bundled A365-specific instrumentor (`A365OpenAIAgentsInstrumentor`) is used, producing spans with the A365 versioned envelope format. When A365 is not enabled, the upstream `opentelemetry-instrumentation-openai-agents-v2` instrumentor is used with standard OTel semantic conventions. See [A365_DOCUMENTATION.md](A365_DOCUMENTATION.md) for details. + Toggle individual instrumentations: ```python diff --git a/pyproject.toml b/pyproject.toml index 281cd20f..aa13b535 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,9 @@ langchain = [ agent-framework = [ "agent-framework>=1.0.0", ] +openai-agents = [ + "openai-agents>=0.0.7", +] test = [ "pytest>=8.0", "pytest-cov>=5.0", diff --git a/src/microsoft/opentelemetry/_distro.py b/src/microsoft/opentelemetry/_distro.py index 9bb2ac8d..0da6a63b 100644 --- a/src/microsoft/opentelemetry/_distro.py +++ b/src/microsoft/opentelemetry/_distro.py @@ -248,7 +248,7 @@ def use_microsoft_opentelemetry(**kwargs: object) -> None: # pylint: disable=to # ---- GenAI main-agent attribute propagation ---- # Prepended to the processor lists so on_start/on_emit run BEFORE any # Batch* export processor appended below; this enriches once per - # span/log and is then visible to the Azure Monitor exporter. + # span/log and is then visible to the Azure Monitor exporter. if enable_azure_monitor: if not otel_kwargs.get(DISABLE_TRACING_ARG, False): otel_kwargs[SPAN_PROCESSORS_ARG] = [ @@ -337,7 +337,7 @@ def use_microsoft_opentelemetry(**kwargs: object) -> None: # pylint: disable=to set_logger_provider(logger_provider) # ---- Instrumentations (always, after providers are set) ---- - _setup_instrumentations(otel_kwargs, **{ENABLE_SENSITIVE_DATA_ARG: enable_sensitive_data}) + _setup_instrumentations(otel_kwargs, enable_a365=enable_a365, **{ENABLE_SENSITIVE_DATA_ARG: enable_sensitive_data}) # ---- SDKStats manager (after providers, before returning) ---- _initialize_sdkstats(enable_azure_monitor) @@ -698,6 +698,7 @@ def _is_instrumentation_enabled(otel_kwargs: Dict[str, Any], lib_name: str) -> b def _setup_instrumentations(otel_kwargs: Dict[str, Any], **kwargs: Any) -> None: """Discover and activate OTel instrumentations for supported libraries.""" + enable_a365: bool = kwargs.pop("enable_a365", False) entry_point_finder = _EntryPointDistFinder() for entry_point in entry_points(group="opentelemetry_instrumentor"): lib_name = entry_point.name @@ -706,6 +707,12 @@ def _setup_instrumentations(otel_kwargs: Dict[str, Any], **kwargs: Any) -> None: if not _is_instrumentation_enabled(otel_kwargs, lib_name): _logger.debug("Instrumentation skipped for library %s", lib_name) continue + # When A365 is enabled, use the A365-specific OpenAI Agents + # instrumentation instead of the upstream entry point so that + # spans carry the versioned message format A365 consumers expect. + if lib_name == "openai_agents" and enable_a365: + _setup_a365_openai_agents_instrumentation() + continue try: entry_point_dist = entry_point_finder.dist_for(entry_point) # type: ignore conflict = get_dist_dependency_conflicts(entry_point_dist) # type: ignore @@ -725,3 +732,20 @@ def _setup_instrumentations(otel_kwargs: Dict[str, Any], **kwargs: Any) -> None: lib_name, exc_info=ex, ) + + +def _setup_a365_openai_agents_instrumentation() -> None: + """Register the A365 OpenAI Agents trace processor.""" + try: + from microsoft.opentelemetry._genai._openai_agents._trace_instrumentor import ( + A365OpenAIAgentsInstrumentor, + ) + + A365OpenAIAgentsInstrumentor().instrument() + set_sdkstats_instrumentation_by_name("openai_agents") + _logger.debug("A365 OpenAI Agents instrumentation enabled.") + except Exception as ex: # pylint: disable=broad-except + _logger.warning( + "Failed to set up A365 OpenAI Agents instrumentation.", + exc_info=ex, + ) diff --git a/src/microsoft/opentelemetry/_genai/_openai_agents/__init__.py b/src/microsoft/opentelemetry/_genai/_openai_agents/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/src/microsoft/opentelemetry/_genai/_openai_agents/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/src/microsoft/opentelemetry/_genai/_openai_agents/_constants.py b/src/microsoft/opentelemetry/_genai/_openai_agents/_constants.py new file mode 100644 index 00000000..52da5751 --- /dev/null +++ b/src/microsoft/opentelemetry/_genai/_openai_agents/_constants.py @@ -0,0 +1,35 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# Span Attribute Types +from microsoft.opentelemetry.a365.core.constants import ( + EXECUTE_TOOL_OPERATION_NAME, + INVOKE_AGENT_OPERATION_NAME, +) +from microsoft.opentelemetry.a365.core.inference_operation_type import InferenceOperationType + +GEN_AI_SPAN_KIND_AGENT_KEY = INVOKE_AGENT_OPERATION_NAME +GEN_AI_SPAN_KIND_TOOL_KEY = EXECUTE_TOOL_OPERATION_NAME +GEN_AI_SPAN_KIND_CHAIN_KEY = "chain" +GEN_AI_SPAN_KIND_LLM_KEY = InferenceOperationType.CHAT.value.lower() +GEN_AI_SPAN_KIND_RETRIEVER_KEY = "retriever" +GEN_AI_SPAN_KIND_EMBEDDING_KEY = "embedding" +GEN_AI_SPAN_KIND_RERANKER_KEY = "reranker" +GEN_AI_SPAN_KIND_GUARDRAIL_KEY = "guardrail" +GEN_AI_SPAN_KIND_EVALUATOR_KEY = "evaluator" +GEN_AI_SPAN_KIND_UNKNOWN_KEY = "unknown" + +# PREFIXES +GEN_AI_MESSAGE_ROLE = "message_role" +GEN_AI_MESSAGE_CONTENT = "message_content" +GEN_AI_MESSAGE_CONTENTS = "message_contents" +GEN_AI_MESSAGE_CONTENT_TYPE = "content_type" +GEN_AI_MESSAGE_TOOL_CALLS = "message_tool_calls" +GEN_AI_MESSAGE_TOOL_CALL_ID = "message_tool_id" +GEN_AI_MESSAGE_TOOL_CALL_NAME = "message_tool_name" +GEN_AI_TOOL_JSON_SCHEMA = "tool_json_schema" +GEN_AI_LLM_TOKEN_COUNT_TOTAL = "llm_token_count_total" +GEN_AI_LLM_TOKEN_COUNT_PROMPT_DETAILS_CACHED_READ = "llm_token_count_prompt_details_cached_read" +GEN_AI_LLM_TOKEN_COUNT_COMPLETION_DETAILS_REASONING = "llm_token_count_completion_details_reasoning" +GEN_AI_GRAPH_NODE_ID = "graph_node_id" +GEN_AI_GRAPH_NODE_PARENT_ID = "graph_node_parent_id" diff --git a/src/microsoft/opentelemetry/_genai/_openai_agents/_message_mapper.py b/src/microsoft/opentelemetry/_genai/_openai_agents/_message_mapper.py new file mode 100644 index 00000000..48d79e76 --- /dev/null +++ b/src/microsoft/opentelemetry/_genai/_openai_agents/_message_mapper.py @@ -0,0 +1,376 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# pylint: disable=too-many-nested-blocks + +"""Maps OpenAI span tag messages to A365 versioned message format. + +Handles three input shapes produced by the OpenAI trace processor: + +1. **Chat-completions format** (from ``GenerationSpanData``): + ``[{"role":"system","content":"..."}, ...]`` +2. **Response API format** (from ``ResponseSpanData``): + - Input: ``[{"type":"message","role":"user","content":"..."}, ...]`` + - Output: ``{"id":"...","model":"...","output":[...], ...}`` (full Response JSON) +3. **Plain string** (from ``AgentSpanData``): + A bare user/assistant message captured from child generation spans. +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Mapping +from typing import Any + +from microsoft.opentelemetry.a365.core.message_utils import serialize_messages +from microsoft.opentelemetry.a365.core.models.messages import ( + ChatMessage, + InputMessages, + MessagePart, + MessageRole, + OutputMessage, + OutputMessages, + TextPart, + ToolCallRequestPart, + ToolCallResponsePart, +) + +logger = logging.getLogger(__name__) + +_ROLE_MAP: dict[str, MessageRole] = { + "system": MessageRole.SYSTEM, + "user": MessageRole.USER, + "assistant": MessageRole.ASSISTANT, + "tool": MessageRole.TOOL, +} + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def map_input_messages(messages_json: str) -> str | None: + """Map a ``gen_ai.input.messages`` tag value to a serialized A365 JSON string. + + Args: + messages_json: The raw JSON string from the span attribute. + + Returns: + Serialized :class:`InputMessages` JSON string, or ``None`` if the + input is empty or cannot be parsed. + """ + if not messages_json: + return None + + # Plain string (AgentSpanData captures bare user text) + try: + raw = json.loads(messages_json) + except (json.JSONDecodeError, TypeError): + return _wrap_plain_input(messages_json) + + if isinstance(raw, list): + return _map_input_list(raw) + + # Unexpected shape + return _wrap_plain_input(messages_json) + + +def map_output_messages(messages_json: str) -> str | None: + """Map a ``gen_ai.output.messages`` tag value to a serialized A365 JSON string. + + Args: + messages_json: The raw JSON string from the span attribute. + + Returns: + Serialized :class:`OutputMessages` JSON string, or ``None`` if the + input is empty or cannot be parsed. + """ + if not messages_json: + return None + + try: + raw = json.loads(messages_json) + except (json.JSONDecodeError, TypeError): + return _wrap_plain_output(messages_json) + + if isinstance(raw, list): + return _map_output_list(raw) + + if isinstance(raw, dict): + # Full Response JSON from ResponseSpanData (model_dump_json) + return _map_response_output(raw) + + return _wrap_plain_output(messages_json) + + +# --------------------------------------------------------------------------- +# Input mapping +# --------------------------------------------------------------------------- + + +def _map_input_list(items: list[Any]) -> str | None: + """Map a list of input items (chat completions or ResponseInputItemParam).""" + chat_messages: list[ChatMessage] = [] + + for item in items: + if not isinstance(item, dict): + continue + + item_type = item.get("type") + + if item_type == "function_call": + # ResponseInputItemParam: function_call -> assistant tool call request + name = item.get("name", "") + if name: + parts: list[MessagePart] = [ + ToolCallRequestPart( + name=name, + id=item.get("call_id"), + arguments=item.get("arguments"), + ) + ] + chat_messages.append(ChatMessage(role=MessageRole.ASSISTANT, parts=parts)) + + elif item_type == "function_call_output": + # ResponseInputItemParam: function_call_output -> tool response + parts = [ + ToolCallResponsePart( + id=item.get("call_id"), + response=item.get("output"), + ) + ] + chat_messages.append(ChatMessage(role=MessageRole.TOOL, parts=parts)) + + elif item_type == "custom_tool_call": + name = item.get("name", "") + if name: + input_data = item.get("input") + args = json.dumps({"input": input_data}) if input_data is not None else None + parts = [ToolCallRequestPart(name=name, id=item.get("call_id"), arguments=args)] + chat_messages.append(ChatMessage(role=MessageRole.ASSISTANT, parts=parts)) + + elif item_type == "custom_tool_call_output": + parts = [ + ToolCallResponsePart( + id=item.get("call_id"), + response=item.get("output"), + ) + ] + chat_messages.append(ChatMessage(role=MessageRole.TOOL, parts=parts)) + + elif item_type == "message" or "role" in item: + # Standard message (ResponseInputItemParam or chat completions) + mapped = _map_chat_completions_message(item) + if mapped is not None: + chat_messages.append(mapped) + + else: + # Unknown type, try as generic message + mapped = _map_chat_completions_message(item) + if mapped is not None: + chat_messages.append(mapped) + + if not chat_messages: + return None + return serialize_messages(InputMessages(messages=chat_messages)) + + +def _map_chat_completions_message(msg: dict[str, Any]) -> ChatMessage | None: + """Map a single chat-completions-style message dict.""" + role_str = msg.get("role", "") + role = _ROLE_MAP.get(str(role_str).lower(), MessageRole.USER) + parts: list[MessagePart] = [] + + # Tool response message + if role == MessageRole.TOOL: + content = msg.get("content", "") + tool_call_id = msg.get("tool_call_id") + response = str(content) if content else "" + if response or tool_call_id: + parts.append(ToolCallResponsePart(id=tool_call_id, response=response)) + return ChatMessage(role=role, parts=parts) if parts else None + + # Text content (string or list) + content = msg.get("content") + if isinstance(content, str) and content.strip(): + parts.append(TextPart(content=content)) + elif isinstance(content, list): + for item in content: + if isinstance(item, dict): + if item.get("type") in ("input_text", "text"): + text = item.get("text", "") + if text: + parts.append(TextPart(content=text)) + elif item.get("type") == "output_text": + text = item.get("text", "") + if text: + parts.append(TextPart(content=text)) + + # Tool calls + tool_calls = msg.get("tool_calls") + if isinstance(tool_calls, list): + for tc in tool_calls: + if not isinstance(tc, dict): + continue + func = tc.get("function", {}) + if isinstance(func, dict): + name = func.get("name") + if name: + parts.append( + ToolCallRequestPart( + name=name, + id=tc.get("id"), + arguments=func.get("arguments"), + ) + ) + + if not parts: + return None + return ChatMessage(role=role, parts=parts, name=msg.get("name")) + + +# --------------------------------------------------------------------------- +# Output mapping +# --------------------------------------------------------------------------- + + +def _map_output_list(items: list[Any]) -> str | None: + """Map a list of chat-completions-style output messages.""" + output_messages: list[OutputMessage] = [] + + for item in items: + if not isinstance(item, dict): + continue + role_str = item.get("role", "assistant") + role = _ROLE_MAP.get(str(role_str).lower(), MessageRole.ASSISTANT) + parts: list[MessagePart] = [] + + # Tool response + if role == MessageRole.TOOL: + content = item.get("content", "") + tool_call_id = item.get("tool_call_id") + response = str(content) if content else "" + if response or tool_call_id: + parts.append(ToolCallResponsePart(id=tool_call_id, response=response)) + else: + # Text content + content = item.get("content") + if isinstance(content, str) and content.strip(): + parts.append(TextPart(content=content)) + elif isinstance(content, list): + for c in content: + if isinstance(c, dict): + text = c.get("text", "") + if text: + parts.append(TextPart(content=text)) + + # Tool calls + tool_calls = item.get("tool_calls") + if isinstance(tool_calls, list): + for tc in tool_calls: + if not isinstance(tc, dict): + continue + func = tc.get("function", {}) + if isinstance(func, dict): + name = func.get("name") + if name: + parts.append( + ToolCallRequestPart( + name=name, + id=tc.get("id"), + arguments=func.get("arguments"), + ) + ) + + finish_reason = item.get("finish_reason") + if parts: + output_messages.append(OutputMessage(role=role, parts=parts, finish_reason=finish_reason)) + + if not output_messages: + return None + return serialize_messages(OutputMessages(messages=output_messages)) + + +def _map_response_output(response: dict[str, Any]) -> str | None: + """Map a full OpenAI Response JSON to A365 OutputMessages. + + The Response object has ``output: [...]`` containing items with + ``type`` of ``message`` or ``function_call``. + """ + output_items = response.get("output") + if not isinstance(output_items, list): + return None + + output_messages: list[OutputMessage] = [] + + for item in output_items: + if not isinstance(item, Mapping): + continue + item_type = item.get("type") + + if item_type == "message": + parts: list[MessagePart] = [] + role_str = item.get("role", "assistant") + role = _ROLE_MAP.get(str(role_str).lower(), MessageRole.ASSISTANT) + + for content_item in item.get("content", []): + if isinstance(content_item, Mapping): + content_type = content_item.get("type") + if content_type == "output_text": + text = content_item.get("text", "") + if text: + parts.append(TextPart(content=text)) + elif content_type == "refusal": + text = content_item.get("refusal", "") + if text: + parts.append(TextPart(content=text)) + + if parts: + finish_reason = item.get("status") + output_messages.append(OutputMessage(role=role, parts=parts, finish_reason=finish_reason)) + + elif item_type == "function_call": + name = item.get("name", "") + if name: + parts = [ + ToolCallRequestPart( + name=name, + id=item.get("call_id"), + arguments=item.get("arguments"), + ) + ] + output_messages.append( + OutputMessage( + role=MessageRole.ASSISTANT, + parts=parts, + finish_reason="tool_call", + ) + ) + + if not output_messages: + return None + return serialize_messages(OutputMessages(messages=output_messages)) + + +# --------------------------------------------------------------------------- +# Plain-string wrappers +# --------------------------------------------------------------------------- + + +def _wrap_plain_input(text: str) -> str | None: + """Wrap a plain text string as a versioned InputMessages.""" + if not text or not text.strip(): + return None + return serialize_messages( + InputMessages(messages=[ChatMessage(role=MessageRole.USER, parts=[TextPart(content=text)])]) + ) + + +def _wrap_plain_output(text: str) -> str | None: + """Wrap a plain text string as a versioned OutputMessages.""" + if not text or not text.strip(): + return None + return serialize_messages( + OutputMessages(messages=[OutputMessage(role=MessageRole.ASSISTANT, parts=[TextPart(content=text)])]) + ) diff --git a/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_instrumentor.py b/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_instrumentor.py new file mode 100644 index 00000000..f7fc64bc --- /dev/null +++ b/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_instrumentor.py @@ -0,0 +1,98 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""A365 instrumentor for OpenAI Agents SDK. + +This instrumentor registers the A365-specific ``OpenAIAgentsTraceProcessor`` +with the OpenAI Agents SDK tracing system. It produces spans with the A365 +versioned message format and additional attributes (``custom.parent.span.id``, +per-message indexed attributes, etc.) that A365 consumers rely on. + +When A365 is enabled, this instrumentor is used **instead of** the upstream +``opentelemetry-instrumentation-openai-agents-v2`` for the A365 exporter +pipeline. The upstream instrumentor continues to handle Azure Monitor and +OTLP export. +""" + +from __future__ import annotations + +import logging +from collections.abc import Collection +from typing import Any + +import opentelemetry.trace as trace_api +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore[attr-defined] + +from ._trace_processor import OpenAIAgentsTraceProcessor + +logger = logging.getLogger(__name__) + +_instruments = ("openai-agents >= 0.0.7",) + + +class A365OpenAIAgentsInstrumentor(BaseInstrumentor): + """Instruments the OpenAI Agents SDK with A365-compatible tracing. + + Registers an ``OpenAIAgentsTraceProcessor`` that emits spans in the + format expected by A365 dashboards, Spectra, and observability exporters. + """ + + _processor: OpenAIAgentsTraceProcessor | None = None + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs: Any) -> None: + if self._processor is not None: + return + + tracer_provider = kwargs.get("tracer_provider") + tracer = trace_api.get_tracer( + __name__, + tracer_provider=tracer_provider, + ) + + self._processor = OpenAIAgentsTraceProcessor(tracer) + + try: + from agents.tracing import get_trace_provider + + provider = get_trace_provider() + # Get existing processors to avoid replacing them + multi = getattr(provider, "_multi_processor", None) + existing = list(getattr(multi, "_processors", ())) + provider.set_processors([*existing, self._processor]) + except Exception: # pylint: disable=broad-exception-caught + # Fallback: use set_trace_processors (replaces all processors) + try: + from agents import set_trace_processors + + set_trace_processors([self._processor]) + except ImportError: + logger.warning( + "Could not register A365 OpenAI Agents trace processor. " + "Neither agents.tracing.get_trace_provider nor " + "agents.set_trace_processors is available." + ) + + def _uninstrument(self, **kwargs: Any) -> None: + if self._processor is None: + return + + try: + from agents.tracing import get_trace_provider + + provider = get_trace_provider() + multi = getattr(provider, "_multi_processor", None) + current = list(getattr(multi, "_processors", ())) + filtered = [p for p in current if p is not self._processor] + provider.set_processors(filtered) + except Exception: # pylint: disable=broad-exception-caught + try: + from agents import set_trace_processors + + set_trace_processors([]) + except ImportError: + pass + + self._processor = None diff --git a/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_processor.py b/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_processor.py new file mode 100644 index 00000000..ab1a91c4 --- /dev/null +++ b/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_processor.py @@ -0,0 +1,292 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# Processor for OpenAI Agents SDK + +from __future__ import annotations + +import logging +from collections import OrderedDict +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from typing_extensions import assert_never + +from agents import MCPListToolsSpanData +from agents.tracing import Span, Trace, TracingProcessor +from agents.tracing.span_data import ( + AgentSpanData, + FunctionSpanData, + GenerationSpanData, + HandoffSpanData, + ResponseSpanData, +) +from microsoft.opentelemetry.a365.core.constants import ( + CHAT_OPERATION_NAME, + CUSTOM_PARENT_SPAN_ID_KEY, + EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OPERATION_NAME_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, + GEN_AI_PROVIDER_NAME_KEY, + GEN_AI_REQUEST_MODEL_KEY, + GEN_AI_TOOL_CALL_ID_KEY, + GEN_AI_TOOL_TYPE_KEY, + INVOKE_AGENT_OPERATION_NAME, +) +from microsoft.opentelemetry.a365.core.utils import as_utc_nano, safe_json_dumps +from opentelemetry import trace as ot_trace +from opentelemetry.context import attach, detach +from opentelemetry.trace import Span as OtelSpan +from opentelemetry.trace import ( + Status, + StatusCode, + Tracer, + set_span_in_context, +) + +from openai.types.responses import ( + Response, +) + +from ._constants import ( + GEN_AI_GRAPH_NODE_PARENT_ID, +) +from ._message_mapper import map_input_messages, map_output_messages +from ._utils import ( + capture_input_message, + capture_output_message, + capture_tool_call_ids, + find_ancestor_agent_span_id, + get_attributes_from_function_span_data, + get_attributes_from_generation_span_data, + get_attributes_from_input, + get_attributes_from_mcp_list_tool_span_data, + get_attributes_from_response, + get_span_kind, + get_span_name, + get_span_status, + get_tool_call_id, +) + +logger = logging.getLogger(__name__) + + +""" +Custom Trace Processor for OpenAI Agents SDK +""" + + +class OpenAIAgentsTraceProcessor(TracingProcessor): + _MAX_HANDOFFS_IN_FLIGHT = 1000 + _MAX_PENDING_TOOL_CALLS = 1000 + _MAX_TRACKED_SPANS = 10000 + + def __init__(self, tracer: Tracer) -> None: + self._tracer = tracer + self._root_spans: OrderedDict[str, OtelSpan] = OrderedDict() + self._otel_spans: OrderedDict[str, OtelSpan] = OrderedDict() + self._tokens: OrderedDict[str, object] = OrderedDict() + # This captures in flight handoff. Once the handoff is complete, the entry is deleted + # If the handoff does not complete, the entry stays in the dict. + # Use an OrderedDict and _MAX_HANDOFFS_IN_FLIGHT to cap the size of the dict + # in case there are large numbers of orphaned handoffs + self._reverse_handoffs_dict: OrderedDict[str, str] = OrderedDict() + # Track input/output messages for agent spans (keyed by agent span_id) + self._agent_inputs: OrderedDict[str, str] = OrderedDict() + self._agent_outputs: OrderedDict[str, str] = OrderedDict() + # Track agent span IDs to find nearest ancestor + self._agent_span_ids: OrderedDict[str, None] = OrderedDict() + # Track parent-child relationships: child_span_id -> parent_span_id + self._span_parents: OrderedDict[str, str] = OrderedDict() + # Track tool_call_ids from GenerationSpan: (function_name, trace_id) -> call_id + # Use an OrderedDict and _MAX_PENDING_TOOL_CALLS to cap the size of the dict + # in case tool calls are captured but never consumed + self._pending_tool_calls: OrderedDict[str, str] = OrderedDict() + + # helper + def _stamp_custom_parent(self, otel_span: OtelSpan, trace_id: str) -> None: + root = self._root_spans.get(trace_id) + if not root: + return + sc = root.get_span_context() + pid_hex = "0x" + ot_trace.format_span_id(sc.span_id) + otel_span.set_attribute(CUSTOM_PARENT_SPAN_ID_KEY, pid_hex) + + @staticmethod + def _cap_ordered_dict(d: OrderedDict, max_size: int) -> None: + """Evict oldest entries from an OrderedDict to stay within max_size.""" + while len(d) > max_size: + d.popitem(last=False) + + def on_trace_start(self, trace: Trace) -> None: + """Called when a trace is started. + + Args: + trace: The trace that started. + """ + + def on_trace_end(self, trace: Trace) -> None: + """Called when a trace is finished. + + Args: + trace: The trace that started. + """ + if root_span := self._root_spans.pop(trace.trace_id, None): + root_span.set_status(Status(StatusCode.OK)) + root_span.end() + + def on_span_start(self, span: Span[Any]) -> None: + """Called when a span is started. + + Args: + span: The span that started. + """ + if not span.started_at: + return + start_time = datetime.fromisoformat(span.started_at) + parent_span = self._otel_spans.get(span.parent_id) if span.parent_id else self._root_spans.get(span.trace_id) + context = set_span_in_context(parent_span) if parent_span else None + span_name = get_span_name(span) + otel_span = self._tracer.start_span( + name=span_name, + context=context, + start_time=as_utc_nano(start_time), + attributes={ + GEN_AI_OPERATION_NAME_KEY: get_span_kind(span.span_data), + GEN_AI_PROVIDER_NAME_KEY: "openai", + }, + ) + self._otel_spans[span.span_id] = otel_span + self._cap_ordered_dict(self._otel_spans, self._MAX_TRACKED_SPANS) + self._tokens[span.span_id] = attach(set_span_in_context(otel_span)) + self._cap_ordered_dict(self._tokens, self._MAX_TRACKED_SPANS) + # Track parent-child relationship for ancestor lookup + if span.parent_id: + self._span_parents[span.span_id] = span.parent_id + self._cap_ordered_dict(self._span_parents, self._MAX_TRACKED_SPANS) + # Track AgentSpan IDs + if isinstance(span.span_data, AgentSpanData): + self._agent_span_ids[span.span_id] = None + self._cap_ordered_dict(self._agent_span_ids, self._MAX_TRACKED_SPANS) + + def on_span_end(self, span: Span[Any]) -> None: # pylint: disable=too-many-statements + """Called when a span is finished. Should not block or raise exceptions. + + Args: + span: The span that finished. + """ + if token := self._tokens.pop(span.span_id, None): + detach(token) # type: ignore[arg-type] + # Clean up parent tracking + self._span_parents.pop(span.span_id, None) + if not (otel_span := self._otel_spans.pop(span.span_id, None)): + return + otel_span.update_name(get_span_name(span)) + + data = span.span_data + + # DATA TYPES AS PER OPENAI AGENTS SDK + if isinstance(data, ResponseSpanData): + if hasattr(data, "response") and isinstance(response := data.response, Response): + otel_span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, response.model_dump_json()) + for k, v in get_attributes_from_response(response): + otel_span.set_attribute(k, v) + if hasattr(data, "input") and (input_data := data.input): + if isinstance(input_data, str): + otel_span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, input_data) + elif isinstance(input_data, list): + otel_span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(input_data)) + for k, v in get_attributes_from_input(input_data): + otel_span.set_attribute(k, v) + elif TYPE_CHECKING: + assert_never(input_data) + elif isinstance(data, GenerationSpanData): + for k, v in get_attributes_from_generation_span_data(data): + otel_span.set_attribute(k, v) + self._stamp_custom_parent(otel_span, span.trace_id) + # Capture input/output messages for nearest ancestor agent span + if agent_span_id := find_ancestor_agent_span_id(span.parent_id, self._agent_span_ids, self._span_parents): + if data.input: + capture_input_message(agent_span_id, data.input, self._agent_inputs) + self._cap_ordered_dict(self._agent_inputs, self._MAX_TRACKED_SPANS) + if data.output: + capture_output_message(agent_span_id, data.output, self._agent_outputs) + self._cap_ordered_dict(self._agent_outputs, self._MAX_TRACKED_SPANS) + # Capture tool_call_ids for later use by FunctionSpan + if data.output: + capture_tool_call_ids(data.output, self._pending_tool_calls, self._MAX_PENDING_TOOL_CALLS) + attrs = otel_span.attributes or {} # type: ignore[attr-defined] + otel_span.update_name(f"{attrs[GEN_AI_OPERATION_NAME_KEY]} {attrs[GEN_AI_REQUEST_MODEL_KEY]}") + elif isinstance(data, FunctionSpanData): + for k, v in get_attributes_from_function_span_data(data): + otel_span.set_attribute(k, v) + self._stamp_custom_parent(otel_span, span.trace_id) + otel_span.set_attribute(GEN_AI_TOOL_TYPE_KEY, data.type) + # Set tool_call_id if available from preceding GenerationSpan + func_args = data.input if data.input else "" + if tool_call_id := get_tool_call_id(data.name, func_args, self._pending_tool_calls): + otel_span.set_attribute(GEN_AI_TOOL_CALL_ID_KEY, tool_call_id) + otel_span.update_name(f"{EXECUTE_TOOL_OPERATION_NAME} {data.name}") + elif isinstance(data, MCPListToolsSpanData): + for k, v in get_attributes_from_mcp_list_tool_span_data(data): + otel_span.set_attribute(k, v) + elif isinstance(data, HandoffSpanData): + # Set this dict to find the parent node when the agent span starts + if data.to_agent and data.from_agent: + key = f"{data.to_agent}:{span.trace_id}" + self._reverse_handoffs_dict[key] = data.from_agent + # Cap the size of the dict + while len(self._reverse_handoffs_dict) > self._MAX_HANDOFFS_IN_FLIGHT: + self._reverse_handoffs_dict.popitem(last=False) + elif isinstance(data, AgentSpanData): + # Lookup the parent node if exists + key = f"{data.name}:{span.trace_id}" + if parent_node := self._reverse_handoffs_dict.pop(key, None): + otel_span.set_attribute(GEN_AI_GRAPH_NODE_PARENT_ID, parent_node) + # Apply captured input/output messages from child spans + if input_msg := self._agent_inputs.pop(span.span_id, None): + otel_span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, input_msg) + if output_msg := self._agent_outputs.pop(span.span_id, None): + otel_span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, output_msg) + otel_span.update_name(f"{INVOKE_AGENT_OPERATION_NAME} {get_span_name(span)}") + # Clean up tracking + self._agent_span_ids.pop(span.span_id, None) + + # Map raw messages to A365 versioned format before ending the span + self._apply_message_mapping(otel_span) + + end_time: int | None = None + if span.ended_at: + try: + end_time = as_utc_nano(datetime.fromisoformat(span.ended_at)) + except ValueError: + pass + otel_span.set_status(status=get_span_status(span)) + otel_span.end(end_time) + + @staticmethod + def _apply_message_mapping(otel_span: OtelSpan) -> None: + """Map raw ``gen_ai.input/output.messages`` to the A365 versioned format.""" + attrs = otel_span.attributes or {} # type: ignore[attr-defined] + operation = attrs.get(GEN_AI_OPERATION_NAME_KEY, "") + if operation not in (INVOKE_AGENT_OPERATION_NAME, CHAT_OPERATION_NAME): + return + + raw_input = attrs.get(GEN_AI_INPUT_MESSAGES_KEY) + if raw_input and isinstance(raw_input, str): + mapped = map_input_messages(raw_input) + if mapped is not None: + otel_span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, mapped) + + raw_output = attrs.get(GEN_AI_OUTPUT_MESSAGES_KEY) + if raw_output and isinstance(raw_output, str): + mapped = map_output_messages(raw_output) + if mapped is not None: + otel_span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, mapped) + + def force_flush(self) -> None: + """Forces an immediate flush of all queued spans/traces.""" + + def shutdown(self) -> None: + """Called when the application stops.""" diff --git a/src/microsoft/opentelemetry/_genai/_openai_agents/_utils.py b/src/microsoft/opentelemetry/_genai/_openai_agents/_utils.py new file mode 100644 index 00000000..c92fc8cb --- /dev/null +++ b/src/microsoft/opentelemetry/_genai/_openai_agents/_utils.py @@ -0,0 +1,614 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# pylint: disable=too-many-nested-blocks + +# -------------------------------------------------- # +# HELPER FUNCTIONS ### +# -------------------------------------------------- # + +from collections.abc import Iterable, Iterator, Mapping +from typing import TYPE_CHECKING, Any + +from typing_extensions import assert_never + +from agents import MCPListToolsSpanData +from agents.tracing import Span +from agents.tracing.span_data import ( + AgentSpanData, + CustomSpanData, + FunctionSpanData, + GenerationSpanData, + GuardrailSpanData, + HandoffSpanData, + ResponseSpanData, + SpanData, +) +from microsoft.opentelemetry.a365.core.constants import ( + GEN_AI_CHOICE, + GEN_AI_EXECUTION_PAYLOAD_KEY, + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, + GEN_AI_PROVIDER_NAME_KEY, + GEN_AI_REQUEST_MODEL_KEY, + GEN_AI_RESPONSE_FINISH_REASONS_KEY, + GEN_AI_TOOL_ARGS_KEY, + GEN_AI_TOOL_CALL_ID_KEY, + GEN_AI_TOOL_CALL_RESULT_KEY, + GEN_AI_TOOL_NAME_KEY, + GEN_AI_USAGE_INPUT_TOKENS_KEY, + GEN_AI_USAGE_OUTPUT_TOKENS_KEY, +) +from microsoft.opentelemetry.a365.core.utils import safe_json_dumps +from opentelemetry.trace import ( + Status, + StatusCode, +) +from opentelemetry.util.types import AttributeValue + +from openai.types.responses import ( + EasyInputMessageParam, + FunctionTool, + Response, + ResponseCustomToolCall, + ResponseCustomToolCallOutputParam, + ResponseCustomToolCallParam, + ResponseFunctionToolCall, + ResponseFunctionToolCallParam, + ResponseInputContentParam, + ResponseInputItemParam, + ResponseOutputItem, + ResponseOutputMessage, + ResponseOutputMessageParam, + ResponseOutputRefusal, + ResponseOutputText, + ResponseUsage, + Tool, +) +from openai.types.responses.response_input_item_param import FunctionCallOutput, Message +from openai.types.responses.response_output_message_param import Content + +from ._constants import ( + GEN_AI_LLM_TOKEN_COUNT_COMPLETION_DETAILS_REASONING, + GEN_AI_LLM_TOKEN_COUNT_PROMPT_DETAILS_CACHED_READ, + GEN_AI_LLM_TOKEN_COUNT_TOTAL, + GEN_AI_MESSAGE_CONTENT, + GEN_AI_MESSAGE_CONTENT_TYPE, + GEN_AI_MESSAGE_CONTENTS, + GEN_AI_MESSAGE_ROLE, + GEN_AI_MESSAGE_TOOL_CALL_ID, + GEN_AI_MESSAGE_TOOL_CALL_NAME, + GEN_AI_MESSAGE_TOOL_CALLS, + GEN_AI_SPAN_KIND_AGENT_KEY, + GEN_AI_SPAN_KIND_CHAIN_KEY, + GEN_AI_SPAN_KIND_LLM_KEY, + GEN_AI_SPAN_KIND_TOOL_KEY, + GEN_AI_TOOL_JSON_SCHEMA, +) + + +def get_span_name(obj: Span[Any]) -> str: + if hasattr(data := obj.span_data, "name") and isinstance(name := data.name, str): + return name + if isinstance(obj.span_data, HandoffSpanData) and obj.span_data.to_agent: + return f"handoff to {obj.span_data.to_agent}" + return obj.span_data.type # type: ignore[no-any-return] + + +def get_span_kind(obj: SpanData) -> str: # pylint: disable=too-many-return-statements + if isinstance(obj, AgentSpanData): + return GEN_AI_SPAN_KIND_AGENT_KEY + if isinstance(obj, FunctionSpanData): + return GEN_AI_SPAN_KIND_TOOL_KEY + if isinstance(obj, GenerationSpanData): + return GEN_AI_SPAN_KIND_LLM_KEY + if isinstance(obj, ResponseSpanData): + return GEN_AI_SPAN_KIND_LLM_KEY + if isinstance(obj, HandoffSpanData): + return GEN_AI_SPAN_KIND_TOOL_KEY + if isinstance(obj, CustomSpanData): + return GEN_AI_SPAN_KIND_CHAIN_KEY + if isinstance(obj, GuardrailSpanData): + return GEN_AI_SPAN_KIND_CHAIN_KEY + return GEN_AI_SPAN_KIND_CHAIN_KEY + + +def get_attributes_from_input( + obj: Iterable[ResponseInputItemParam], + msg_idx: int = 1, +) -> Iterator[tuple[str, AttributeValue]]: + for i, item in enumerate(obj, msg_idx): + prefix = f"{GEN_AI_INPUT_MESSAGES_KEY}.{i}." + if "type" not in item: + if "role" in item and "content" in item: + yield from get_attributes_from_message_param( + { # type: ignore[misc, arg-type] + "type": "message", + "role": item["role"], # type: ignore[typeddict-item] + "content": item["content"], # type: ignore[typeddict-item] + }, + prefix, + ) + elif item["type"] == "message": + yield from get_attributes_from_message_param(item, prefix) + elif item["type"] == "function_call": + yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", "assistant" + yield from get_attributes_from_response_function_tool_call_param( + item, + f"{prefix}{GEN_AI_MESSAGE_TOOL_CALLS}.0.", + ) + elif item["type"] == "function_call_output": + yield from get_attributes_from_function_call_output(item, prefix) + elif item["type"] == "custom_tool_call": + yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", "assistant" + yield from get_attributes_from_response_custom_tool_call_param( + item, + f"{prefix}{GEN_AI_MESSAGE_TOOL_CALLS}.0.", + ) + elif item["type"] == "custom_tool_call_output": + yield from get_attributes_from_response_custom_tool_call_output_param(item, prefix) + elif TYPE_CHECKING and item["type"] is not None: + assert_never(item["type"]) # type: ignore[arg-type] + + +def get_attributes_from_message_param( + obj: EasyInputMessageParam | Message | ResponseOutputMessageParam, + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", obj["role"] + if content := obj.get("content"): + if isinstance(content, str): + yield f"{prefix}{GEN_AI_MESSAGE_CONTENT}", content + elif isinstance(content, list): + yield from get_attributes_from_message_content_list(content, prefix) + + +def get_attributes_from_response_function_tool_call_param( + obj: ResponseFunctionToolCallParam, + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + yield f"{prefix}{GEN_AI_MESSAGE_TOOL_CALL_ID}", obj["call_id"] + yield f"{prefix}{GEN_AI_MESSAGE_TOOL_CALL_NAME}", obj["name"] + if obj["arguments"] != "{}": + yield f"{prefix}{GEN_AI_TOOL_ARGS_KEY}", obj["arguments"] + + +def get_attributes_from_response_custom_tool_call_param( + obj: ResponseCustomToolCallParam, + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + if (call_id := obj.get("call_id")) is not None: + yield f"{prefix}{GEN_AI_TOOL_CALL_ID_KEY}", call_id + if (name := obj.get("name")) is not None: + yield f"{prefix}{GEN_AI_TOOL_NAME_KEY}", name + if (input_data := obj.get("input")) is not None: + yield ( + f"{prefix}{GEN_AI_TOOL_ARGS_KEY}", + safe_json_dumps({"input": input_data}), + ) + + +def get_attributes_from_response_custom_tool_call_output_param( + obj: ResponseCustomToolCallOutputParam, + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", "tool" + if (call_id := obj.get("call_id")) is not None: + yield f"{prefix}{GEN_AI_TOOL_CALL_ID_KEY}", call_id + if (output := obj.get("output")) is not None: + yield f"{prefix}{GEN_AI_TOOL_CALL_RESULT_KEY}", str(output) + + +def get_attributes_from_function_call_output( + obj: FunctionCallOutput, + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", "tool" + yield f"{prefix}{GEN_AI_TOOL_CALL_ID_KEY}", obj["call_id"] + yield f"{prefix}{GEN_AI_TOOL_CALL_RESULT_KEY}", obj["output"] # type: ignore[misc] + + +def get_attributes_from_generation_span_data( + obj: GenerationSpanData, +) -> Iterator[tuple[str, AttributeValue]]: + yield GEN_AI_PROVIDER_NAME_KEY, "openai" + if isinstance(model := obj.model, str): + yield GEN_AI_REQUEST_MODEL_KEY, model + if isinstance(obj.model_config, dict) and (param := {k: v for k, v in obj.model_config.items() if v is not None}): + yield GEN_AI_EXECUTION_PAYLOAD_KEY, safe_json_dumps(param) + yield from _get_attributes_from_chat_completions_input(obj.input) + yield from _get_attributes_from_chat_completions_output(obj.output) + yield from _get_attributes_from_chat_completions_usage(obj.usage) + + +def get_attributes_from_mcp_list_tool_span_data( + obj: MCPListToolsSpanData, +) -> Iterator[tuple[str, AttributeValue]]: + yield GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(obj.result) + + +def _get_attributes_from_chat_completions_input( + obj: Iterable[Mapping[str, Any]] | None, +) -> Iterator[tuple[str, AttributeValue]]: + if not obj: + return + try: + yield GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(obj) + except Exception: # pylint: disable=broad-exception-caught + pass + yield from get_attributes_from_chat_completions_message_dicts( + obj, + f"{GEN_AI_INPUT_MESSAGES_KEY}.", + ) + + +def _get_attributes_from_chat_completions_output( + obj: Iterable[Mapping[str, Any]] | None, +) -> Iterator[tuple[str, AttributeValue]]: + if not obj: + return + try: + yield GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(obj) + except Exception: # pylint: disable=broad-exception-caught + pass + + # Collect all finish_reason values + finish_reasons = [str(message.get("finish_reason")) for message in obj if message.get("finish_reason") is not None] + if finish_reasons: + yield GEN_AI_RESPONSE_FINISH_REASONS_KEY, ",".join(finish_reasons) + + yield from get_attributes_from_chat_completions_message_dicts( + obj, + f"{GEN_AI_OUTPUT_MESSAGES_KEY}.", + ) + + +def get_attributes_from_chat_completions_message_dicts( + obj: Iterable[Mapping[str, Any]], + prefix: str = "", + msg_idx: int = 0, + tool_call_idx: int = 0, +) -> Iterator[tuple[str, AttributeValue]]: + if not isinstance(obj, Iterable): + return + for msg in obj: + if isinstance(role := msg.get("role"), str): + yield f"{prefix}{msg_idx}.{GEN_AI_MESSAGE_ROLE}", role + if content := msg.get("content"): + yield from get_attributes_from_chat_completions_message_content( + content, + f"{prefix}{msg_idx}.", + ) + if isinstance(tool_call_id := msg.get("tool_call_id"), str): + yield f"{prefix}{msg_idx}.{GEN_AI_MESSAGE_TOOL_CALL_ID}", tool_call_id + if isinstance(tool_calls := msg.get("tool_calls"), Iterable): + for tc in tool_calls: + yield from _get_attributes_from_chat_completions_tool_call_dict( + tc, + f"{prefix}{msg_idx}.{GEN_AI_MESSAGE_TOOL_CALLS}.{tool_call_idx}.", + ) + tool_call_idx += 1 + msg_idx += 1 + + +def get_attributes_from_chat_completions_message_content( + obj: str | Iterable[Mapping[str, Any]], + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + if isinstance(obj, str): + yield f"{prefix}{GEN_AI_MESSAGE_CONTENT}", obj + elif isinstance(obj, Iterable): + for i, item in enumerate(obj): + if not isinstance(item, Mapping): + continue + yield from _get_attributes_from_chat_completions_message_content_item( + item, + f"{prefix}{GEN_AI_MESSAGE_CONTENTS}.{i}.", + ) + + +def _get_attributes_from_chat_completions_message_content_item( + obj: Mapping[str, Any], + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + if obj.get("type") == "text" and (text := obj.get("text")): + yield f"{prefix}{GEN_AI_OUTPUT_MESSAGES_KEY}", text + + +def _get_attributes_from_chat_completions_tool_call_dict( + obj: Mapping[str, Any], + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + if id_ := obj.get("id"): + yield f"{prefix}{GEN_AI_TOOL_CALL_ID_KEY}", id_ + if function := obj.get("function"): + if name := function.get("name"): + yield f"{prefix}{GEN_AI_TOOL_NAME_KEY}", name + if arguments := function.get("arguments"): + if arguments != "{}": + yield f"{prefix}{GEN_AI_TOOL_ARGS_KEY}", arguments + + +def _get_attributes_from_chat_completions_usage( + obj: Mapping[str, Any] | None, +) -> Iterator[tuple[str, AttributeValue]]: + if not obj: + return + if input_tokens := obj.get("input_tokens"): + yield GEN_AI_USAGE_INPUT_TOKENS_KEY, input_tokens + if output_tokens := obj.get("output_tokens"): + yield GEN_AI_USAGE_OUTPUT_TOKENS_KEY, output_tokens + + +def _convert_to_primitive(value: Any) -> bool | str | bytes | int | float: + if isinstance(value, (bool, str, bytes, int, float)): + return value + if isinstance(value, (list, tuple)): + return safe_json_dumps(value) + if isinstance(value, dict): + return safe_json_dumps(value) + return str(value) + + +def get_attributes_from_function_span_data( + obj: FunctionSpanData, +) -> Iterator[tuple[str, AttributeValue]]: + yield GEN_AI_TOOL_NAME_KEY, obj.name + if obj.input: + yield GEN_AI_TOOL_ARGS_KEY, obj.input + if obj.output is not None: + yield GEN_AI_TOOL_CALL_RESULT_KEY, _convert_to_primitive(obj.output) + + +def get_attributes_from_message_content_list( + obj: Iterable[ResponseInputContentParam | Content], + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + for i, item in enumerate(obj): + if item["type"] == "input_text" or item["type"] == "output_text": + yield f"{prefix}{GEN_AI_INPUT_MESSAGES_KEY}.{i}.{GEN_AI_MESSAGE_CONTENT_TYPE}", "text" + yield ( + f"{prefix}{GEN_AI_INPUT_MESSAGES_KEY}.{i}.{GEN_AI_OUTPUT_MESSAGES_KEY}", + item["text"], + ) + elif item["type"] == "refusal": + yield f"{prefix}{GEN_AI_INPUT_MESSAGES_KEY}.{i}.{GEN_AI_MESSAGE_CONTENT_TYPE}", "text" + yield ( + f"{prefix}{GEN_AI_INPUT_MESSAGES_KEY}.{i}.{GEN_AI_OUTPUT_MESSAGES_KEY}", + item["refusal"], + ) + elif TYPE_CHECKING: + assert_never(item["type"]) # type: ignore[arg-type] + + +def get_attributes_from_response(obj: Response) -> Iterator[tuple[str, AttributeValue]]: + yield from get_attributes_from_tools(obj.tools) + yield from get_attributes_from_usage(obj.usage) + yield from get_attributes_from_response_output(obj.output) + if isinstance(obj.instructions, str): + yield from _get_attributes_from_response_instruction(obj.instructions) + else: + pass # TODO: handle list instructions + yield GEN_AI_REQUEST_MODEL_KEY, obj.model + param = obj.model_dump( + exclude_none=True, + exclude={"object", "tools", "usage", "output", "error", "status"}, + ) + yield GEN_AI_EXECUTION_PAYLOAD_KEY, safe_json_dumps(param) + + +def get_attributes_from_tools( + tools: Iterable[Tool] | None, +) -> Iterator[tuple[str, AttributeValue]]: + if not tools: + return + for i, tool in enumerate(tools): + if isinstance(tool, FunctionTool): + yield ( + f"{GEN_AI_CHOICE}.{i}.{GEN_AI_TOOL_JSON_SCHEMA}", + safe_json_dumps( + { + "type": "function", + "function": { + "name": tool.name, + "description": tool.description, + "parameters": tool.parameters, + "strict": tool.strict, + }, + } + ), + ) + else: + pass + + +def get_attributes_from_response_output( + obj: Iterable[ResponseOutputItem], + msg_idx: int = 0, +) -> Iterator[tuple[str, AttributeValue]]: + tool_call_idx = 0 + for _i, item in enumerate(obj): + if item.type == "message": + prefix = f"{GEN_AI_OUTPUT_MESSAGES_KEY}.{msg_idx}." + yield from _get_attributes_from_message(item, prefix) + msg_idx += 1 + elif item.type == "function_call": + yield f"{GEN_AI_OUTPUT_MESSAGES_KEY}.{msg_idx}.{GEN_AI_MESSAGE_ROLE}", "assistant" + prefix = f"{GEN_AI_OUTPUT_MESSAGES_KEY}.{msg_idx}.{GEN_AI_MESSAGE_TOOL_CALLS}.{tool_call_idx}." + yield from _get_attributes_from_function_tool_call(item, prefix) + tool_call_idx += 1 + elif item.type == "custom_tool_call": + yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", "assistant" + yield from _get_attributes_from_response_custom_tool_call( + item, + f"{prefix}{GEN_AI_MESSAGE_TOOL_CALLS}.0.", + ) + elif TYPE_CHECKING: + assert_never(item) # type: ignore[arg-type] + + +def _get_attributes_from_response_instruction( + instructions: str | None, +) -> Iterator[tuple[str, AttributeValue]]: + if not instructions: + return + yield f"{GEN_AI_INPUT_MESSAGES_KEY}.0.{GEN_AI_MESSAGE_ROLE}", "system" + yield f"{GEN_AI_INPUT_MESSAGES_KEY}.0.{GEN_AI_OUTPUT_MESSAGES_KEY}", instructions + + +def _get_attributes_from_function_tool_call( + obj: ResponseFunctionToolCall, + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + yield f"{prefix}{GEN_AI_TOOL_CALL_ID_KEY}", obj.call_id + yield f"{prefix}{GEN_AI_TOOL_NAME_KEY}", obj.name + if obj.arguments != "{}": + yield f"{prefix}{GEN_AI_TOOL_ARGS_KEY}", obj.arguments + + +def _get_attributes_from_response_custom_tool_call( + obj: ResponseCustomToolCall, + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + if (call_id := obj.call_id) is not None: + yield f"{prefix}{GEN_AI_TOOL_CALL_ID_KEY}", call_id + if (name := obj.name) is not None: + yield f"{prefix}{GEN_AI_TOOL_NAME_KEY}", name + if (input_data := obj.input) is not None: + yield ( + f"{prefix}{GEN_AI_TOOL_ARGS_KEY}", + safe_json_dumps({"input": input_data}), + ) + + +def _get_attributes_from_message( + obj: ResponseOutputMessage, + prefix: str = "", +) -> Iterator[tuple[str, AttributeValue]]: + yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", obj.role + for i, item in enumerate(obj.content): + if isinstance(item, ResponseOutputText): + yield f"{prefix}{GEN_AI_OUTPUT_MESSAGES_KEY}.{i}.{GEN_AI_MESSAGE_CONTENT_TYPE}", "text" + yield ( + f"{prefix}{GEN_AI_OUTPUT_MESSAGES_KEY}.{i}.{GEN_AI_OUTPUT_MESSAGES_KEY}", + item.text, + ) + elif isinstance(item, ResponseOutputRefusal): + yield f"{prefix}{GEN_AI_OUTPUT_MESSAGES_KEY}.{i}.{GEN_AI_MESSAGE_CONTENT_TYPE}", "text" + yield ( + f"{prefix}{GEN_AI_OUTPUT_MESSAGES_KEY}.{i}.{GEN_AI_OUTPUT_MESSAGES_KEY}", + item.refusal, + ) + elif TYPE_CHECKING: + assert_never(item) # type: ignore[arg-type] + + +def get_attributes_from_usage( + obj: ResponseUsage | None, +) -> Iterator[tuple[str, AttributeValue]]: + if not obj: + return + yield GEN_AI_USAGE_OUTPUT_TOKENS_KEY, obj.output_tokens + yield GEN_AI_USAGE_INPUT_TOKENS_KEY, obj.input_tokens + yield GEN_AI_LLM_TOKEN_COUNT_TOTAL, obj.total_tokens + yield GEN_AI_LLM_TOKEN_COUNT_PROMPT_DETAILS_CACHED_READ, obj.input_tokens_details.cached_tokens + yield ( + GEN_AI_LLM_TOKEN_COUNT_COMPLETION_DETAILS_REASONING, + obj.output_tokens_details.reasoning_tokens, + ) + + +def get_span_status(obj: Span[Any]) -> Status: + if error := getattr(obj, "error", None): + return Status(status_code=StatusCode.ERROR, description=f"{error.get('message')}: {error.get('data')}") + return Status(StatusCode.OK) + + +def capture_tool_call_ids(output_list: Any, pending_tool_calls: dict[str, str], max_size: int = 1000) -> None: + """Extract and store tool_call_ids from generation output for later use by FunctionSpan. + + Args: + output_list: The generation output containing tool calls + pending_tool_calls: OrderedDict to store pending tool calls + max_size: Maximum number of pending tool calls to keep in memory + """ + if not output_list: + return + try: + for msg in output_list: + if isinstance(msg, dict) and msg.get("role") == "assistant": + tool_calls = msg.get("tool_calls") + if tool_calls: + for tc in tool_calls: + if isinstance(tc, dict): + call_id = tc.get("id") + func = tc.get("function", {}) + func_name = func.get("name") if isinstance(func, dict) else None + func_args = func.get("arguments", "") if isinstance(func, dict) else "" + if call_id and func_name: + # Key by (function_name, arguments) to uniquely identify each call + key = f"{func_name}:{func_args}" + pending_tool_calls[key] = call_id + # Cap the size of the dict to prevent unbounded growth + while len(pending_tool_calls) > max_size: + pending_tool_calls.popitem(last=False) # type: ignore[call-arg] + except Exception: # pylint: disable=broad-exception-caught + pass + + +def get_tool_call_id(function_name: str, function_args: str, pending_tool_calls: dict[str, str]) -> str | None: + """Get and remove the tool_call_id for a function with specific arguments.""" + key = f"{function_name}:{function_args}" + return pending_tool_calls.pop(key, None) + + +def capture_input_message(parent_span_id: str, input_list: Any, agent_inputs: dict[str, str]) -> None: + """Extract and store the first user message from input list for parent agent span.""" + if parent_span_id in agent_inputs: + return # Already captured + if not input_list: + return + try: + for msg in input_list: + if isinstance(msg, dict) and msg.get("role") == "user": + content = msg.get("content", "") + if content: + agent_inputs[parent_span_id] = str(content) + return + except Exception: # pylint: disable=broad-exception-caught + pass + + +def capture_output_message(parent_span_id: str, output_list: Any, agent_outputs: dict[str, str]) -> None: + """Extract and store the last assistant message with actual content (no tool calls) for parent agent span.""" + if not output_list: + return + try: + # Iterate in reverse to get the last assistant message with content (not a tool call) + output_items = list(output_list) if not isinstance(output_list, list) else output_list + for msg in reversed(output_items): + if isinstance(msg, dict) and msg.get("role") == "assistant": + content = msg.get("content") + tool_calls = msg.get("tool_calls") + # Only capture if there's actual content and no tool_calls + # (tool_calls means this is an intermediate step, not the final response) + if content and not tool_calls: + agent_outputs[parent_span_id] = str(content) + return + except Exception: # pylint: disable=broad-exception-caught + pass + + +def find_ancestor_agent_span_id( + span_id: str | None, + agent_span_ids: set[str] | Mapping[str, object], + span_parents: Mapping[str, str], +) -> str | None: + """Walk up the parent chain to find the nearest ancestor AgentSpan.""" + current = span_id + visited: set[str] = set() # Prevent infinite loops + while current and current not in visited: + if current in agent_span_ids: + return current + visited.add(current) + current = span_parents.get(current) + return None diff --git a/tests/a365/test_span_processor.py b/tests/a365/test_span_processor.py index 9743b2ce..e9c21aa9 100644 --- a/tests/a365/test_span_processor.py +++ b/tests/a365/test_span_processor.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +# pylint: disable=no-member import unittest from unittest.mock import MagicMock diff --git a/tests/agent_framework/test_trace_instrumentor.py b/tests/agent_framework/test_trace_instrumentor.py index af95175f..07ddcc14 100644 --- a/tests/agent_framework/test_trace_instrumentor.py +++ b/tests/agent_framework/test_trace_instrumentor.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +# pylint: disable=no-member """Tests for AgentFrameworkInstrumentor and AgentFrameworkSpanProcessor.""" diff --git a/tests/genai/main_agent/test_span_processor.py b/tests/genai/main_agent/test_span_processor.py index bf557c86..e9d37c2a 100644 --- a/tests/genai/main_agent/test_span_processor.py +++ b/tests/genai/main_agent/test_span_processor.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +# pylint: disable=no-member """Tests for GenAIMainAgentSpanProcessor.""" diff --git a/tests/openai_agents/__init__.py b/tests/openai_agents/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/tests/openai_agents/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/tests/openai_agents/integration/__init__.py b/tests/openai_agents/integration/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/tests/openai_agents/integration/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/tests/openai_agents/integration/conftest.py b/tests/openai_agents/integration/conftest.py new file mode 100644 index 00000000..68a09342 --- /dev/null +++ b/tests/openai_agents/integration/conftest.py @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import os +from pathlib import Path +from typing import Any + +import pytest + +try: + from dotenv import load_dotenv + + tests_dir = Path(__file__).parent.parent.parent.parent + env_file = tests_dir / ".env" + if env_file.exists(): + load_dotenv(env_file) +except ImportError: + pass + + +def pytest_configure(config): + config.addinivalue_line("markers", "integration: marks tests as integration tests") + + +@pytest.fixture(scope="session") +def azure_openai_config() -> dict[str, Any]: + """Azure OpenAI configuration for integration tests.""" + endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") + deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4") + api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-08-01-preview") + + if not endpoint: + pytest.skip("Integration tests require AZURE_OPENAI_ENDPOINT") + + return { + "endpoint": endpoint, + "deployment": deployment, + "api_version": api_version, + } + + +@pytest.fixture(scope="session") +def openai_config() -> dict[str, Any]: + """OpenAI configuration for integration tests.""" + api_key = os.getenv("OPENAI_API_KEY") + + if not api_key: + pytest.skip("Integration tests require OPENAI_API_KEY") + + return { + "api_key": api_key, + "model": os.getenv("OPENAI_MODEL", "gpt-4o-mini"), + } diff --git a/tests/openai_agents/integration/test_openai_agents_trace_processor.py b/tests/openai_agents/integration/test_openai_agents_trace_processor.py new file mode 100644 index 00000000..f57a1aac --- /dev/null +++ b/tests/openai_agents/integration/test_openai_agents_trace_processor.py @@ -0,0 +1,383 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Integration tests for A365 OpenAI Agents trace processor with real OpenAI / Azure OpenAI.""" + +import logging +import os +import time + +import pytest +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace import get_tracer_provider, set_tracer_provider + +try: + from agents import Agent, Runner, function_tool, set_default_openai_api, set_default_openai_client + from openai import AsyncAzureOpenAI +except ImportError: + pytest.skip( + "openai-agents and openai packages required for integration tests", + allow_module_level=True, + ) + +from microsoft.opentelemetry._genai._openai_agents._trace_instrumentor import A365OpenAIAgentsInstrumentor + +# A365 attribute keys used in assertions +from microsoft.opentelemetry.a365.core.constants import ( + CUSTOM_PARENT_SPAN_ID_KEY, + EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OPERATION_NAME_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, + GEN_AI_PROVIDER_NAME_KEY, + GEN_AI_REQUEST_MODEL_KEY, + GEN_AI_TOOL_CALL_ID_KEY, + INVOKE_AGENT_OPERATION_NAME, +) + + +@function_tool +def get_weather(city: str) -> str: + """Return a mock weather forecast for a city.""" + return f"The weather in {city} is sunny, 25 °C." + + +@function_tool +def add_numbers(a: float, b: float) -> float: + """Add two numbers together and return the result.""" + return a + b + + +class _MockSpanProcessor: + """Captures finished spans in-memory for assertions.""" + + def __init__(self): + self.captured_spans = [] + + def on_start(self, span, parent_context=None): + pass + + def _on_ending(self, span): + pass + + def on_end(self, span): + self.captured_spans.append(span) + + def shutdown(self): + pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + +def _setup_provider_and_processor(): + """Create a TracerProvider with a MockSpanProcessor and install it.""" + processor = _MockSpanProcessor() + current = get_tracer_provider() + if isinstance(current, TracerProvider): + current.add_span_processor(processor) + else: + tp = TracerProvider() + tp.add_span_processor(processor) + set_tracer_provider(tp) + return processor + + +def _spans_by_name(spans): + """Return dict of span name -> span for easy lookup.""" + return {s.name: s for s in spans} + + +def _spans_with_attr(spans, attr_key): + """Return list of spans that have a given attribute.""" + return [s for s in spans if attr_key in dict(s.attributes or {})] + + +# --------------------------------------------------------------------------- +# Tests using OpenAI (api key based) +# --------------------------------------------------------------------------- + + +@pytest.mark.integration +class TestOpenAIAgentsTraceProcessorOpenAI: + """Integration tests using OpenAI API directly.""" + + _processor: _MockSpanProcessor + + def setup_method(self): + self._processor = _setup_provider_and_processor() + + def test_simple_agent_produces_spans(self, openai_config): + """A simple agent run should produce at least agent + generation spans.""" + instrumentor = A365OpenAIAgentsInstrumentor() + instrumentor.instrument() + + try: + agent = Agent( + name="Greeter", + instructions="You are a concise assistant. Reply in one sentence.", + model=openai_config["model"], + ) + result = Runner.run_sync(agent, "Say hello.") + time.sleep(1) + + assert result is not None + assert len(result.final_output) > 0 + + spans = self._processor.captured_spans + assert len(spans) >= 2, f"Expected >=2 spans, got {len(spans)}" + + # Should have an agent (invoke_agent) span + agent_spans = [ + s for s in spans if (s.attributes or {}).get(GEN_AI_OPERATION_NAME_KEY) == INVOKE_AGENT_OPERATION_NAME + ] + assert len(agent_spans) >= 1, "No invoke_agent span found" + + # All spans should have provider = openai + for s in spans: + attrs = dict(s.attributes or {}) + assert attrs.get(GEN_AI_PROVIDER_NAME_KEY) == "openai" + + finally: + instrumentor.uninstrument() + + def test_agent_with_tool_call(self, openai_config): + """An agent with a tool should produce function/execute_tool spans.""" + instrumentor = A365OpenAIAgentsInstrumentor() + instrumentor.instrument() + + try: + agent = Agent( + name="Calculator", + instructions="You are a calculator. Use the add_numbers tool to compute the answer. Be concise.", + tools=[add_numbers], + model=openai_config["model"], + ) + result = Runner.run_sync(agent, "What is 15 + 27?") + time.sleep(1) + + assert result is not None + assert "42" in result.final_output + + spans = self._processor.captured_spans + # Should have execute_tool spans + tool_spans = [ + s for s in spans if (s.attributes or {}).get(GEN_AI_OPERATION_NAME_KEY) == EXECUTE_TOOL_OPERATION_NAME + ] + assert len(tool_spans) >= 1, "No execute_tool span found" + + # Tool span should have tool_call_id + tool_attrs = dict(tool_spans[0].attributes or {}) + assert GEN_AI_TOOL_CALL_ID_KEY in tool_attrs, "Tool span missing tool_call_id" + + # Generation spans should have custom.parent.span.id + gen_spans = _spans_with_attr(spans, CUSTOM_PARENT_SPAN_ID_KEY) + assert len(gen_spans) >= 1, "No spans with custom.parent.span.id" + + finally: + instrumentor.uninstrument() + + def test_agent_captures_messages(self, openai_config): + """Agent span should have input/output messages from child generation spans.""" + instrumentor = A365OpenAIAgentsInstrumentor() + instrumentor.instrument() + + try: + agent = Agent( + name="EchoBot", + instructions="Repeat what the user says, word for word.", + model=openai_config["model"], + ) + result = Runner.run_sync(agent, "The quick brown fox") + time.sleep(1) + + assert result is not None + spans = self._processor.captured_spans + + # Agent span should have captured messages + agent_spans = [ + s for s in spans if (s.attributes or {}).get(GEN_AI_OPERATION_NAME_KEY) == INVOKE_AGENT_OPERATION_NAME + ] + assert len(agent_spans) >= 1 + + agent_attrs = dict(agent_spans[0].attributes or {}) + # Input and output messages should be present on the agent span + assert GEN_AI_INPUT_MESSAGES_KEY in agent_attrs, "Agent span missing input messages" + assert GEN_AI_OUTPUT_MESSAGES_KEY in agent_attrs, "Agent span missing output messages" + + finally: + instrumentor.uninstrument() + + def test_generation_span_has_model(self, openai_config): + """Generation (chat) spans should have gen_ai.request.model.""" + instrumentor = A365OpenAIAgentsInstrumentor() + instrumentor.instrument() + + try: + agent = Agent( + name="ModelCheck", + instructions="Reply with one word: OK", + model=openai_config["model"], + ) + Runner.run_sync(agent, "Ping") + time.sleep(1) + + spans = self._processor.captured_spans + gen_spans = _spans_with_attr(spans, GEN_AI_REQUEST_MODEL_KEY) + assert len(gen_spans) >= 1, "No generation span with model attribute" + + model_val = dict(gen_spans[0].attributes or {})[GEN_AI_REQUEST_MODEL_KEY] + assert isinstance(model_val, str) + assert len(model_val) > 0 + + finally: + instrumentor.uninstrument() + + +# --------------------------------------------------------------------------- +# Tests using Azure OpenAI +# --------------------------------------------------------------------------- + + +@pytest.mark.integration +class TestOpenAIAgentsTraceProcessorAzure: + """Integration tests using Azure OpenAI.""" + + _processor: _MockSpanProcessor + + def setup_method(self): + self._processor = _setup_provider_and_processor() + + def test_azure_agent_produces_spans(self, azure_openai_config): + """An agent using Azure OpenAI should produce A365-format spans.""" + instrumentor = A365OpenAIAgentsInstrumentor() + instrumentor.instrument() + + try: + azure_client = AsyncAzureOpenAI( + azure_endpoint=azure_openai_config["endpoint"], + api_key=os.environ.get("AZURE_OPENAI_API_KEY", ""), + api_version=azure_openai_config["api_version"], + ) + set_default_openai_client(azure_client, use_for_tracing=False) + set_default_openai_api("chat_completions") + + agent = Agent( + name="AzureBot", + instructions="You are a concise assistant. Reply in one sentence.", + model=azure_openai_config["deployment"], + ) + result = Runner.run_sync(agent, "What is Python?") + time.sleep(1) + + assert result is not None + assert len(result.final_output) > 0 + + spans = self._processor.captured_spans + assert len(spans) >= 2 + + agent_spans = [ + s for s in spans if (s.attributes or {}).get(GEN_AI_OPERATION_NAME_KEY) == INVOKE_AGENT_OPERATION_NAME + ] + assert len(agent_spans) >= 1 + + finally: + instrumentor.uninstrument() + + def test_azure_agent_with_tool(self, azure_openai_config): + """An Azure agent with tools should produce tool spans with A365 attributes.""" + instrumentor = A365OpenAIAgentsInstrumentor() + instrumentor.instrument() + + try: + azure_client = AsyncAzureOpenAI( + azure_endpoint=azure_openai_config["endpoint"], + api_key=os.environ.get("AZURE_OPENAI_API_KEY", ""), + api_version=azure_openai_config["api_version"], + ) + set_default_openai_client(azure_client, use_for_tracing=False) + set_default_openai_api("chat_completions") + + agent = Agent( + name="AzureCalc", + instructions="Use the add_numbers tool to answer math questions. Be concise.", + tools=[add_numbers], + model=azure_openai_config["deployment"], + ) + result = Runner.run_sync(agent, "What is 100 + 200?") + time.sleep(1) + + assert result is not None + assert "300" in result.final_output + + spans = self._processor.captured_spans + tool_spans = [ + s for s in spans if (s.attributes or {}).get(GEN_AI_OPERATION_NAME_KEY) == EXECUTE_TOOL_OPERATION_NAME + ] + assert len(tool_spans) >= 1 + + finally: + instrumentor.uninstrument() + + +# --------------------------------------------------------------------------- +# Manual runner +# --------------------------------------------------------------------------- + + +def _run_manual_tests(): + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logger = logging.getLogger(__name__) + + logger.info("=== Starting OpenAI Agents A365 Integration Tests ===") + + processor = _setup_provider_and_processor() + + instrumentor = A365OpenAIAgentsInstrumentor() + instrumentor.instrument() + + try: + logger.info("--- test: simple agent ---") + agent = Agent( + name="Greeter", + instructions="You are a concise assistant. Reply in one sentence.", + ) + result = Runner.run_sync(agent, "Say hello.") + time.sleep(1) + + logger.info(" Result: %s", result.final_output[:100] if result else "None") + logger.info(" Captured spans: %d", len(processor.captured_spans)) + for s in processor.captured_spans: + attrs = dict(s.attributes or {}) + logger.info(" [%s] %s", attrs.get(GEN_AI_OPERATION_NAME_KEY, "?"), s.name) + + processor.captured_spans.clear() + + logger.info("--- test: agent with tool ---") + calc_agent = Agent( + name="Calculator", + instructions="Use add_numbers to compute. Be concise.", + tools=[add_numbers], + ) + result = Runner.run_sync(calc_agent, "What is 7 + 8?") + time.sleep(1) + + logger.info(" Result: %s", result.final_output[:100] if result else "None") + logger.info(" Captured spans: %d", len(processor.captured_spans)) + for s in processor.captured_spans: + attrs = dict(s.attributes or {}) + logger.info( + " [%s] %s tool_call_id=%s", + attrs.get(GEN_AI_OPERATION_NAME_KEY, "?"), + s.name, + attrs.get(GEN_AI_TOOL_CALL_ID_KEY, "n/a"), + ) + + finally: + instrumentor.uninstrument() + + logger.info("=== Done ===") + + +if __name__ == "__main__": + _run_manual_tests() diff --git a/tests/openai_agents/test_message_mapper.py b/tests/openai_agents/test_message_mapper.py new file mode 100644 index 00000000..73808871 --- /dev/null +++ b/tests/openai_agents/test_message_mapper.py @@ -0,0 +1,307 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Unit tests for the OpenAI Agents message mapper.""" + +import json +from unittest import TestCase + +import pytest + +pytest.importorskip("agents") + +from microsoft.opentelemetry._genai._openai_agents._message_mapper import ( # noqa: E402 # pylint: disable=wrong-import-position + map_input_messages, + map_output_messages, +) + + +class TestMapInputMessages(TestCase): + """Tests for map_input_messages.""" + + def test_empty_string_returns_none(self) -> None: + self.assertIsNone(map_input_messages("")) + + def test_whitespace_only_returns_none(self) -> None: + self.assertIsNone(map_input_messages(" ")) + + def test_plain_string_wraps_as_user_message(self) -> None: + result = map_input_messages("Hello world") + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + self.assertEqual(len(data["messages"]), 1) + self.assertEqual(data["messages"][0]["role"], "user") + self.assertEqual(data["messages"][0]["parts"][0]["type"], "text") + self.assertEqual(data["messages"][0]["parts"][0]["content"], "Hello world") + + def test_chat_completions_format(self) -> None: + """Standard chat completions format with system + user messages.""" + raw = json.dumps( + [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hi there"}, + ] + ) + result = map_input_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + self.assertEqual(len(data["messages"]), 2) + self.assertEqual(data["messages"][0]["role"], "system") + self.assertEqual(data["messages"][0]["parts"][0]["content"], "You are helpful.") + self.assertEqual(data["messages"][1]["role"], "user") + self.assertEqual(data["messages"][1]["parts"][0]["content"], "Hi there") + + def test_chat_completions_with_tool_calls(self) -> None: + """Messages with assistant tool_calls and tool response.""" + raw = json.dumps( + [ + {"role": "user", "content": "What is 2+2?"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": "call_123", + "function": {"name": "add", "arguments": '{"a":2,"b":2}'}, + } + ], + }, + {"role": "tool", "content": "4", "tool_call_id": "call_123"}, + ] + ) + result = map_input_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + self.assertEqual(len(data["messages"]), 3) + + # User message + self.assertEqual(data["messages"][0]["role"], "user") + self.assertEqual(data["messages"][0]["parts"][0]["type"], "text") + + # Assistant with tool call + self.assertEqual(data["messages"][1]["role"], "assistant") + self.assertEqual(data["messages"][1]["parts"][0]["type"], "tool_call") + self.assertEqual(data["messages"][1]["parts"][0]["name"], "add") + self.assertEqual(data["messages"][1]["parts"][0]["id"], "call_123") + + # Tool response + self.assertEqual(data["messages"][2]["role"], "tool") + self.assertEqual(data["messages"][2]["parts"][0]["type"], "tool_call_response") + self.assertEqual(data["messages"][2]["parts"][0]["id"], "call_123") + self.assertEqual(data["messages"][2]["parts"][0]["response"], "4") + + def test_response_input_item_param_format(self) -> None: + """ResponseInputItemParam format with typed items.""" + raw = json.dumps( + [ + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Hello"}], + }, + { + "type": "function_call", + "name": "get_weather", + "call_id": "fc_1", + "arguments": '{"city":"Seattle"}', + }, + {"type": "function_call_output", "call_id": "fc_1", "output": "Sunny, 22C"}, + ] + ) + result = map_input_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + self.assertEqual(len(data["messages"]), 3) + + # Message + self.assertEqual(data["messages"][0]["role"], "user") + self.assertEqual(data["messages"][0]["parts"][0]["type"], "text") + + # Function call + self.assertEqual(data["messages"][1]["role"], "assistant") + self.assertEqual(data["messages"][1]["parts"][0]["type"], "tool_call") + self.assertEqual(data["messages"][1]["parts"][0]["name"], "get_weather") + + # Function call output + self.assertEqual(data["messages"][2]["role"], "tool") + self.assertEqual(data["messages"][2]["parts"][0]["type"], "tool_call_response") + self.assertEqual(data["messages"][2]["parts"][0]["response"], "Sunny, 22C") + + def test_message_without_type_field(self) -> None: + """Messages without explicit 'type' field (EasyInputMessageParam).""" + raw = json.dumps( + [ + {"role": "user", "content": "Hello"}, + ] + ) + result = map_input_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["messages"][0]["role"], "user") + + def test_invalid_json_wraps_as_plain_text(self) -> None: + result = map_input_messages("not json {") + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + self.assertEqual(data["messages"][0]["parts"][0]["content"], "not json {") + + def test_empty_list_returns_none(self) -> None: + self.assertIsNone(map_input_messages("[]")) + + def test_custom_tool_call_input(self) -> None: + """Custom tool call input items.""" + raw = json.dumps( + [ + { + "type": "custom_tool_call", + "name": "my_tool", + "call_id": "ct_1", + "input": {"key": "value"}, + }, + { + "type": "custom_tool_call_output", + "call_id": "ct_1", + "output": "result", + }, + ] + ) + result = map_input_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(len(data["messages"]), 2) + self.assertEqual(data["messages"][0]["parts"][0]["type"], "tool_call") + self.assertEqual(data["messages"][0]["parts"][0]["name"], "my_tool") + self.assertEqual(data["messages"][1]["parts"][0]["type"], "tool_call_response") + + +class TestMapOutputMessages(TestCase): + """Tests for map_output_messages.""" + + def test_empty_string_returns_none(self) -> None: + self.assertIsNone(map_output_messages("")) + + def test_plain_string_wraps_as_assistant(self) -> None: + result = map_output_messages("The answer is 42.") + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + self.assertEqual(data["messages"][0]["role"], "assistant") + self.assertEqual(data["messages"][0]["parts"][0]["content"], "The answer is 42.") + + def test_chat_completions_output(self) -> None: + """Standard chat completions output with finish_reason.""" + raw = json.dumps( + [ + { + "role": "assistant", + "content": "Paris is the capital.", + "finish_reason": "stop", + } + ] + ) + result = map_output_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + self.assertEqual(len(data["messages"]), 1) + msg = data["messages"][0] + self.assertEqual(msg["role"], "assistant") + self.assertEqual(msg["parts"][0]["type"], "text") + self.assertEqual(msg["parts"][0]["content"], "Paris is the capital.") + self.assertEqual(msg["finish_reason"], "stop") + + def test_chat_completions_with_tool_calls(self) -> None: + """Output with tool_calls.""" + raw = json.dumps( + [ + { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": "call_abc", + "function": {"name": "search", "arguments": '{"q":"test"}'}, + } + ], + "finish_reason": "tool_calls", + } + ] + ) + result = map_output_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + msg = data["messages"][0] + self.assertEqual(msg["role"], "assistant") + self.assertEqual(msg["parts"][0]["type"], "tool_call") + self.assertEqual(msg["parts"][0]["name"], "search") + self.assertEqual(msg["finish_reason"], "tool_calls") + + def test_response_json_format(self) -> None: + """Full OpenAI Response JSON (from model_dump_json).""" + raw = json.dumps( + { + "id": "resp_123", + "model": "gpt-4o", + "output": [ + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Hello!"}], + "status": "completed", + } + ], + } + ) + result = map_output_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + msg = data["messages"][0] + self.assertEqual(msg["role"], "assistant") + self.assertEqual(msg["parts"][0]["type"], "text") + self.assertEqual(msg["parts"][0]["content"], "Hello!") + + def test_response_json_with_function_call(self) -> None: + """Response JSON with function_call output item.""" + raw = json.dumps( + { + "id": "resp_456", + "model": "gpt-4o", + "output": [ + { + "type": "function_call", + "name": "get_weather", + "call_id": "fc_1", + "arguments": '{"city":"NYC"}', + } + ], + } + ) + result = map_output_messages(raw) + self.assertIsNotNone(result) + data = json.loads(result) + msg = data["messages"][0] + self.assertEqual(msg["role"], "assistant") + self.assertEqual(msg["parts"][0]["type"], "tool_call") + self.assertEqual(msg["parts"][0]["name"], "get_weather") + self.assertEqual(msg["finish_reason"], "tool_call") + + def test_response_json_without_output_returns_none(self) -> None: + """Response JSON without output field.""" + raw = json.dumps({"id": "resp_789", "model": "gpt-4o"}) + self.assertIsNone(map_output_messages(raw)) + + def test_empty_list_returns_none(self) -> None: + self.assertIsNone(map_output_messages("[]")) + + def test_invalid_json_wraps_as_plain_text(self) -> None: + result = map_output_messages("bad json") + self.assertIsNotNone(result) + data = json.loads(result) + self.assertEqual(data["version"], "0.1.0") + self.assertEqual(data["messages"][0]["role"], "assistant") diff --git a/tests/openai_agents/test_trace_instrumentor.py b/tests/openai_agents/test_trace_instrumentor.py new file mode 100644 index 00000000..e22a0526 --- /dev/null +++ b/tests/openai_agents/test_trace_instrumentor.py @@ -0,0 +1,103 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for A365OpenAIAgentsInstrumentor.""" + +import unittest +from unittest.mock import MagicMock, patch + +import pytest + +pytest.importorskip("agents") + +from microsoft.opentelemetry._genai._openai_agents._trace_instrumentor import ( # noqa: E402 # pylint: disable=wrong-import-position + A365OpenAIAgentsInstrumentor, +) + + +class TestA365OpenAIAgentsInstrumentor(unittest.TestCase): + """Unit tests for A365OpenAIAgentsInstrumentor class.""" + + def setUp(self): + # Reset singleton state between tests + A365OpenAIAgentsInstrumentor._processor = None + A365OpenAIAgentsInstrumentor._is_instrumented_by_opentelemetry = False + + def tearDown(self): + A365OpenAIAgentsInstrumentor._processor = None + A365OpenAIAgentsInstrumentor._is_instrumented_by_opentelemetry = False + + def test_instrumentor_initialization(self): + instrumentor = A365OpenAIAgentsInstrumentor() + self.assertIsNotNone(instrumentor) + + def test_instrumentation_dependencies(self): + instrumentor = A365OpenAIAgentsInstrumentor() + deps = instrumentor.instrumentation_dependencies() + self.assertIn("openai-agents >= 0.0.7", deps) + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_instrumentor.trace_api") + def test_instrument_creates_processor(self, mock_trace_api): + mock_tracer = MagicMock() + mock_trace_api.get_tracer.return_value = mock_tracer + + # Mock the agents.tracing module + mock_provider = MagicMock() + mock_multi = MagicMock() + mock_multi._processors = [] + mock_provider._multi_processor = mock_multi + + instrumentor = A365OpenAIAgentsInstrumentor() + with patch( + "microsoft.opentelemetry._genai._openai_agents._trace_instrumentor.A365OpenAIAgentsInstrumentor._instrument" + ): + # Call directly + pass + + # Test the actual _instrument logic + with patch.dict("sys.modules", {"agents.tracing": MagicMock()}): + with patch( + "microsoft.opentelemetry._genai._openai_agents._trace_instrumentor.OpenAIAgentsTraceProcessor" + ) as MockProc: + mock_proc_instance = MagicMock() + MockProc.return_value = mock_proc_instance + + instrumentor._instrument() + + mock_trace_api.get_tracer.assert_called_once() + MockProc.assert_called_once_with(mock_tracer) + self.assertIs(instrumentor._processor, mock_proc_instance) + + def test_instrument_idempotent(self): + """Calling _instrument twice should not create a second processor.""" + instrumentor = A365OpenAIAgentsInstrumentor() + + # Simulate that a processor is already set (first _instrument succeeded) + existing_processor = MagicMock() + instrumentor._processor = existing_processor + + with patch( + "microsoft.opentelemetry._genai._openai_agents._trace_instrumentor.OpenAIAgentsTraceProcessor" + ) as MockProc: + instrumentor._instrument() + + # Should not have created a new processor + MockProc.assert_not_called() + # Original processor should still be in place + self.assertIs(instrumentor._processor, existing_processor) + + def test_uninstrument_clears_processor(self): + """_uninstrument should clear the processor reference.""" + instrumentor = A365OpenAIAgentsInstrumentor() + mock_proc = MagicMock() + instrumentor._processor = mock_proc + + with patch.dict("sys.modules", {"agents.tracing": MagicMock()}): + instrumentor._uninstrument() + + self.assertIsNone(instrumentor._processor) + + def test_uninstrument_noop_without_processor(self): + """_uninstrument should not raise when no processor is set.""" + instrumentor = A365OpenAIAgentsInstrumentor() + instrumentor._uninstrument() # should not raise diff --git a/tests/openai_agents/test_trace_processor.py b/tests/openai_agents/test_trace_processor.py new file mode 100644 index 00000000..8622e749 --- /dev/null +++ b/tests/openai_agents/test_trace_processor.py @@ -0,0 +1,293 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for the OpenAI Agents A365 trace processor.""" + +from collections import OrderedDict +from unittest import TestCase +from unittest.mock import MagicMock, patch + +import pytest + +pytest.importorskip("agents") + +from agents.tracing.span_data import ( # noqa: E402 # pylint: disable=wrong-import-position + AgentSpanData, + FunctionSpanData, + HandoffSpanData, +) + +from microsoft.opentelemetry._genai._openai_agents._trace_processor import ( # noqa: E402 # pylint: disable=wrong-import-position + OpenAIAgentsTraceProcessor, +) +from microsoft.opentelemetry.a365.core.constants import ( # noqa: E402 # pylint: disable=wrong-import-position + CUSTOM_PARENT_SPAN_ID_KEY, + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, + GEN_AI_PROVIDER_NAME_KEY, + GEN_AI_TOOL_CALL_ID_KEY, + GEN_AI_TOOL_TYPE_KEY, + INVOKE_AGENT_OPERATION_NAME, +) +from microsoft.opentelemetry._genai._openai_agents._constants import ( # noqa: E402 # pylint: disable=wrong-import-position + GEN_AI_GRAPH_NODE_PARENT_ID, +) + +_NOW = "2024-06-01T12:00:00+00:00" +_NOW_END = "2024-06-01T12:00:01+00:00" + + +def _make_span( + span_data, + span_id="span-1", + parent_id=None, + trace_id="trace-1", + started_at=_NOW, + ended_at=_NOW_END, + error=None, +): + span = MagicMock() + span.span_id = span_id + span.parent_id = parent_id + span.trace_id = trace_id + span.started_at = started_at + span.ended_at = ended_at + span.span_data = span_data + span.error = error + return span + + +def _make_trace(trace_id="trace-1"): + trace = MagicMock() + trace.trace_id = trace_id + return trace + + +def _make_otel_span(): + otel_span = MagicMock() + otel_span.attributes = {} + + def set_attribute(k, v): + otel_span.attributes[k] = v + + otel_span.set_attribute = set_attribute + ctx = MagicMock() + ctx.span_id = 12345 + otel_span.get_span_context.return_value = ctx + return otel_span + + +class TestOpenAIAgentsTraceProcessor(TestCase): + """Unit tests for OpenAIAgentsTraceProcessor.""" + + def setUp(self): + self.mock_tracer = MagicMock() + self.mock_otel_span = _make_otel_span() + self.mock_tracer.start_span.return_value = self.mock_otel_span + self.processor = OpenAIAgentsTraceProcessor(self.mock_tracer) + + def test_on_trace_end_ends_root_span(self): + """Root span should be ended with OK status when trace ends.""" + root = _make_otel_span() + self.processor._root_spans["trace-1"] = root + + trace = _make_trace("trace-1") + self.processor.on_trace_end(trace) + + root.set_status.assert_called_once() + root.end.assert_called_once() + self.assertNotIn("trace-1", self.processor._root_spans) + + def test_on_trace_end_noop_without_root(self): + """on_trace_end should not raise if no root span exists.""" + trace = _make_trace("trace-nonexistent") + self.processor.on_trace_end(trace) # should not raise + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.attach") + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.set_span_in_context") + def test_on_span_start_creates_otel_span(self, mock_set_ctx, mock_attach): + """on_span_start should create an OTel span with correct attributes.""" + agent_data = AgentSpanData(name="TestAgent") + span = _make_span(agent_data) + + self.processor.on_span_start(span) + + self.mock_tracer.start_span.assert_called_once() + call_kwargs = self.mock_tracer.start_span.call_args + attrs = call_kwargs.kwargs.get("attributes") or call_kwargs[1].get("attributes") + self.assertEqual(attrs[GEN_AI_PROVIDER_NAME_KEY], "openai") + self.assertIn("span-1", self.processor._otel_spans) + + def test_on_span_start_skips_without_started_at(self): + """on_span_start should skip if started_at is None.""" + span = _make_span(AgentSpanData(name="Agent"), started_at=None) + self.processor.on_span_start(span) + self.mock_tracer.start_span.assert_not_called() + + def test_on_span_start_tracks_agent_span_ids(self): + """Agent spans should be tracked for ancestor lookup.""" + span = _make_span(AgentSpanData(name="Agent"), span_id="agent-1") + with patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.attach"): + self.processor.on_span_start(span) + self.assertIn("agent-1", self.processor._agent_span_ids) + + def test_on_span_start_tracks_parent_child(self): + """Parent-child relationships should be tracked.""" + span = _make_span(AgentSpanData(name="Agent"), span_id="child-1", parent_id="parent-1") + with patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.attach"): + self.processor.on_span_start(span) + self.assertEqual(self.processor._span_parents["child-1"], "parent-1") + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.detach") + def test_on_span_end_agent_span_sets_name(self, mock_detach): + """Agent span should update name to 'invoke_agent '.""" + agent_data = AgentSpanData(name="MyAgent") + span = _make_span(agent_data, span_id="agent-1") + + # Pre-populate the otel span + otel_span = _make_otel_span() + self.processor._otel_spans["agent-1"] = otel_span + self.processor._tokens["agent-1"] = MagicMock() + + self.processor.on_span_end(span) + + otel_span.update_name.assert_called() + # Last call should be the invoke_agent name + last_name = otel_span.update_name.call_args_list[-1][0][0] + self.assertTrue(last_name.startswith(INVOKE_AGENT_OPERATION_NAME)) + self.assertIn("MyAgent", last_name) + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.detach") + def test_on_span_end_function_span_sets_tool_attrs(self, mock_detach): + """Function span should set tool name, type, and update name.""" + func_data = FunctionSpanData(name="add_numbers", input='{"a":1}', output="2") + span = _make_span(func_data, span_id="func-1") + + otel_span = _make_otel_span() + self.processor._otel_spans["func-1"] = otel_span + self.processor._tokens["func-1"] = MagicMock() + + self.processor.on_span_end(span) + + self.assertEqual(otel_span.attributes[GEN_AI_TOOL_TYPE_KEY], "function") + otel_span.update_name.assert_called() + last_name = otel_span.update_name.call_args_list[-1][0][0] + self.assertIn("add_numbers", last_name) + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.detach") + def test_on_span_end_function_span_sets_tool_call_id(self, mock_detach): + """Function span should set tool_call_id from pending_tool_calls.""" + func_data = FunctionSpanData(name="add", input='{"a":1}', output="2") + span = _make_span(func_data, span_id="func-1") + + otel_span = _make_otel_span() + self.processor._otel_spans["func-1"] = otel_span + self.processor._tokens["func-1"] = MagicMock() + # Pre-populate pending tool calls + self.processor._pending_tool_calls['add:{"a":1}'] = "call_abc" + + self.processor.on_span_end(span) + + self.assertEqual(otel_span.attributes.get(GEN_AI_TOOL_CALL_ID_KEY), "call_abc") + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.detach") + def test_handoff_sets_graph_node_parent_id(self, mock_detach): + """Handoff → AgentSpan should set graph_node_parent_id.""" + # 1) Process HandoffSpan: from AgentA to AgentB + handoff_data = HandoffSpanData(from_agent="AgentA", to_agent="AgentB") + handoff_span = _make_span(handoff_data, span_id="handoff-1", trace_id="t1") + otel_handoff = _make_otel_span() + self.processor._otel_spans["handoff-1"] = otel_handoff + self.processor._tokens["handoff-1"] = MagicMock() + self.processor.on_span_end(handoff_span) + + # 2) Now process AgentB's AgentSpan + agent_data = AgentSpanData(name="AgentB") + agent_span = _make_span(agent_data, span_id="agent-b", trace_id="t1") + otel_agent = _make_otel_span() + self.processor._otel_spans["agent-b"] = otel_agent + self.processor._tokens["agent-b"] = MagicMock() + self.processor.on_span_end(agent_span) + + self.assertEqual(otel_agent.attributes.get(GEN_AI_GRAPH_NODE_PARENT_ID), "AgentA") + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.detach") + def test_agent_span_gets_input_output_from_child_generation(self, mock_detach): + """Agent span should receive input/output messages captured from child GenerationSpan.""" + # Set up agent span in tracking + self.processor._agent_span_ids["agent-1"] = None + + # Pre-populate captured messages + self.processor._agent_inputs["agent-1"] = "Hello user input" + self.processor._agent_outputs["agent-1"] = "Hello output" + + agent_data = AgentSpanData(name="TestAgent") + span = _make_span(agent_data, span_id="agent-1") + + otel_span = _make_otel_span() + self.processor._otel_spans["agent-1"] = otel_span + self.processor._tokens["agent-1"] = MagicMock() + + self.processor.on_span_end(span) + + self.assertEqual(otel_span.attributes.get(GEN_AI_INPUT_MESSAGES_KEY), "Hello user input") + self.assertEqual(otel_span.attributes.get(GEN_AI_OUTPUT_MESSAGES_KEY), "Hello output") + + def test_stamp_custom_parent_sets_attribute(self): + """_stamp_custom_parent should set custom.parent.span.id from root span.""" + root = _make_otel_span() + self.processor._root_spans["trace-1"] = root + + otel_span = _make_otel_span() + self.processor._stamp_custom_parent(otel_span, "trace-1") + + self.assertIn(CUSTOM_PARENT_SPAN_ID_KEY, otel_span.attributes) + + def test_stamp_custom_parent_noop_without_root(self): + """_stamp_custom_parent should not set attribute when no root span.""" + otel_span = _make_otel_span() + self.processor._stamp_custom_parent(otel_span, "nonexistent") + self.assertNotIn(CUSTOM_PARENT_SPAN_ID_KEY, otel_span.attributes) + + def test_cap_ordered_dict(self): + """_cap_ordered_dict should evict oldest entries.""" + d = OrderedDict() + for i in range(15): + d[f"key-{i}"] = i + OpenAIAgentsTraceProcessor._cap_ordered_dict(d, 10) + self.assertEqual(len(d), 10) + # Oldest keys should be removed + self.assertNotIn("key-0", d) + self.assertIn("key-14", d) + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.detach") + def test_on_span_end_noop_without_otel_span(self, mock_detach): + """on_span_end should not raise if OTel span was not created.""" + span = _make_span(AgentSpanData(name="Agent"), span_id="missing") + self.processor.on_span_end(span) # should not raise + + @patch("microsoft.opentelemetry._genai._openai_agents._trace_processor.detach") + def test_on_span_end_sets_error_status(self, mock_detach): + """Spans with errors should get ERROR status.""" + agent_data = AgentSpanData(name="Agent") + span = _make_span( + agent_data, + span_id="err-1", + error={"message": "bad", "data": "detail"}, + ) + otel_span = _make_otel_span() + self.processor._otel_spans["err-1"] = otel_span + self.processor._tokens["err-1"] = MagicMock() + + self.processor.on_span_end(span) + + status_call = otel_span.set_status.call_args + status = status_call.kwargs.get("status") or status_call[0][0] + from opentelemetry.trace import StatusCode + + self.assertEqual(status.status_code, StatusCode.ERROR) + + def test_force_flush_and_shutdown(self): + """force_flush and shutdown should not raise.""" + self.processor.force_flush() + self.processor.shutdown() diff --git a/tests/openai_agents/test_utils.py b/tests/openai_agents/test_utils.py new file mode 100644 index 00000000..55d97554 --- /dev/null +++ b/tests/openai_agents/test_utils.py @@ -0,0 +1,248 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for OpenAI Agents A365 utility functions.""" + +from collections import OrderedDict +from unittest import TestCase +from unittest.mock import MagicMock + +import pytest + +pytest.importorskip("agents") + +from agents.tracing.span_data import ( # noqa: E402 # pylint: disable=wrong-import-position + AgentSpanData, + FunctionSpanData, + GenerationSpanData, + HandoffSpanData, +) + +from microsoft.opentelemetry._genai._openai_agents._utils import ( # noqa: E402 # pylint: disable=wrong-import-position + capture_input_message, + capture_output_message, + capture_tool_call_ids, + find_ancestor_agent_span_id, + get_attributes_from_function_span_data, + get_span_kind, + get_span_name, + get_span_status, + get_tool_call_id, +) +from microsoft.opentelemetry._genai._openai_agents._constants import ( # noqa: E402 # pylint: disable=wrong-import-position + GEN_AI_SPAN_KIND_AGENT_KEY, + GEN_AI_SPAN_KIND_LLM_KEY, + GEN_AI_SPAN_KIND_TOOL_KEY, +) +from microsoft.opentelemetry.a365.core.constants import ( # noqa: E402 # pylint: disable=wrong-import-position + GEN_AI_TOOL_ARGS_KEY, + GEN_AI_TOOL_CALL_RESULT_KEY, + GEN_AI_TOOL_NAME_KEY, +) + + +class TestGetSpanName(TestCase): + def test_agent_span_data_name(self): + span = MagicMock() + span.span_data = AgentSpanData(name="MyAgent") + self.assertEqual(get_span_name(span), "MyAgent") + + def test_function_span_data_name(self): + span = MagicMock() + span.span_data = FunctionSpanData(name="my_tool", input="", output="") + self.assertEqual(get_span_name(span), "my_tool") + + def test_handoff_span_data_name(self): + span = MagicMock() + span.span_data = HandoffSpanData(from_agent="A", to_agent="B") + self.assertEqual(get_span_name(span), "handoff to B") + + def test_handoff_span_data_no_to_agent_uses_type(self): + span = MagicMock() + data = HandoffSpanData(from_agent="A", to_agent="") + span.span_data = data + # Falls through to type + result = get_span_name(span) + self.assertIsNotNone(result) + + +class TestGetSpanKind(TestCase): + def test_agent_span(self): + self.assertEqual(get_span_kind(AgentSpanData(name="A")), GEN_AI_SPAN_KIND_AGENT_KEY) + + def test_function_span(self): + self.assertEqual( + get_span_kind(FunctionSpanData(name="f", input="", output="")), + GEN_AI_SPAN_KIND_TOOL_KEY, + ) + + def test_generation_span(self): + data = GenerationSpanData(model="gpt-4", model_config={}, input=[], output=[], usage={}) + self.assertEqual(get_span_kind(data), GEN_AI_SPAN_KIND_LLM_KEY) + + def test_handoff_span(self): + data = HandoffSpanData(from_agent="A", to_agent="B") + self.assertEqual(get_span_kind(data), GEN_AI_SPAN_KIND_TOOL_KEY) + + +class TestGetSpanStatus(TestCase): + def test_ok_status_without_error(self): + span = MagicMock() + span.error = None + status = get_span_status(span) + from opentelemetry.trace import StatusCode + + self.assertEqual(status.status_code, StatusCode.OK) + + def test_error_status_with_error(self): + span = MagicMock() + span.error = {"message": "something failed", "data": "details"} + status = get_span_status(span) + from opentelemetry.trace import StatusCode + + self.assertEqual(status.status_code, StatusCode.ERROR) + self.assertIn("something failed", status.description) + + +class TestGetAttributesFromFunctionSpanData(TestCase): + def test_basic_function_span(self): + data = FunctionSpanData(name="add", input='{"a":1}', output="2") + attrs = dict(get_attributes_from_function_span_data(data)) + self.assertEqual(attrs[GEN_AI_TOOL_NAME_KEY], "add") + self.assertEqual(attrs[GEN_AI_TOOL_ARGS_KEY], '{"a":1}') + self.assertEqual(attrs[GEN_AI_TOOL_CALL_RESULT_KEY], "2") + + def test_function_span_no_input(self): + data = FunctionSpanData(name="noop", input="", output=None) + attrs = dict(get_attributes_from_function_span_data(data)) + self.assertEqual(attrs[GEN_AI_TOOL_NAME_KEY], "noop") + self.assertNotIn(GEN_AI_TOOL_ARGS_KEY, attrs) + + +class TestCaptureToolCallIds(TestCase): + def test_captures_tool_call_ids(self): + output = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_123", + "function": {"name": "add", "arguments": '{"a":1}'}, + } + ], + } + ] + pending = OrderedDict() + capture_tool_call_ids(output, pending) + self.assertEqual(pending['add:{"a":1}'], "call_123") + + def test_caps_size(self): + pending = OrderedDict() + for i in range(15): + output = [ + { + "role": "assistant", + "tool_calls": [ + { + "id": f"call_{i}", + "function": {"name": f"fn_{i}", "arguments": "{}"}, + } + ], + } + ] + capture_tool_call_ids(output, pending, max_size=10) + self.assertLessEqual(len(pending), 10) + + def test_empty_output(self): + pending = OrderedDict() + capture_tool_call_ids(None, pending) + capture_tool_call_ids([], pending) + self.assertEqual(len(pending), 0) + + +class TestGetToolCallId(TestCase): + def test_pops_matching_entry(self): + pending = OrderedDict() + pending['add:{"a":1}'] = "call_abc" + result = get_tool_call_id("add", '{"a":1}', pending) + self.assertEqual(result, "call_abc") + self.assertNotIn('add:{"a":1}', pending) + + def test_returns_none_for_missing(self): + result = get_tool_call_id("unknown", "", OrderedDict()) + self.assertIsNone(result) + + +class TestCaptureInputMessage(TestCase): + def test_captures_first_user_message(self): + inputs = OrderedDict() + data = [ + {"role": "system", "content": "You are helpful"}, + {"role": "user", "content": "Hello"}, + ] + capture_input_message("span-1", data, inputs) + self.assertEqual(inputs["span-1"], "Hello") + + def test_does_not_overwrite_existing(self): + inputs = OrderedDict() + inputs["span-1"] = "First" + data = [{"role": "user", "content": "Second"}] + capture_input_message("span-1", data, inputs) + self.assertEqual(inputs["span-1"], "First") + + def test_empty_input(self): + inputs = OrderedDict() + capture_input_message("span-1", None, inputs) + capture_input_message("span-1", [], inputs) + self.assertNotIn("span-1", inputs) + + +class TestCaptureOutputMessage(TestCase): + def test_captures_last_assistant_content(self): + outputs = OrderedDict() + data = [ + {"role": "assistant", "content": None, "tool_calls": [{"id": "c1"}]}, + {"role": "assistant", "content": "Final answer"}, + ] + capture_output_message("span-1", data, outputs) + self.assertEqual(outputs["span-1"], "Final answer") + + def test_skips_tool_call_messages(self): + outputs = OrderedDict() + data = [ + {"role": "assistant", "content": "call tools", "tool_calls": [{"id": "c1"}]}, + ] + capture_output_message("span-1", data, outputs) + self.assertNotIn("span-1", outputs) + + def test_empty_output(self): + outputs = OrderedDict() + capture_output_message("span-1", None, outputs) + self.assertNotIn("span-1", outputs) + + +class TestFindAncestorAgentSpanId(TestCase): + def test_finds_direct_parent(self): + agent_ids = {"agent-1": None} + parents = {"child-1": "agent-1"} + result = find_ancestor_agent_span_id("child-1", agent_ids, parents) + self.assertEqual(result, "agent-1") + + def test_finds_grandparent(self): + agent_ids = {"agent-1": None} + parents = {"child-1": "mid-1", "mid-1": "agent-1"} + result = find_ancestor_agent_span_id("child-1", agent_ids, parents) + self.assertEqual(result, "agent-1") + + def test_returns_none_when_no_agent(self): + result = find_ancestor_agent_span_id("child-1", {}, {"child-1": "parent-1"}) + self.assertIsNone(result) + + def test_handles_cycles(self): + parents = {"a": "b", "b": "a"} + result = find_ancestor_agent_span_id("a", {}, parents) + self.assertIsNone(result) + + def test_returns_none_for_none_input(self): + result = find_ancestor_agent_span_id(None, {"agent-1": None}, {}) + self.assertIsNone(result) diff --git a/tests/semantic_kernel/test_trace_instrumentor.py b/tests/semantic_kernel/test_trace_instrumentor.py index de497dc3..3aeb911c 100644 --- a/tests/semantic_kernel/test_trace_instrumentor.py +++ b/tests/semantic_kernel/test_trace_instrumentor.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +# pylint: disable=no-member """Tests for SemanticKernelInstrumentor and SemanticKernelSpanProcessor.""" diff --git a/tests/test_distro.py b/tests/test_distro.py index fcba8a4a..47e27ea8 100644 --- a/tests/test_distro.py +++ b/tests/test_distro.py @@ -1110,9 +1110,7 @@ def test_processors_registered_when_azure_monitor_enabled(self, append_mock): @patch("microsoft.opentelemetry._distro._setup_logging") @patch("microsoft.opentelemetry._distro._setup_metrics") @patch("microsoft.opentelemetry._distro._setup_tracing") - def test_processors_not_registered_when_azure_monitor_disabled( - self, tracing_mock, metrics_mock, logging_mock - ): + def test_processors_not_registered_when_azure_monitor_disabled(self, tracing_mock, metrics_mock, logging_mock): from microsoft.opentelemetry._genai.main_agent import ( GenAIMainAgentLogRecordProcessor, GenAIMainAgentSpanProcessor, @@ -1130,9 +1128,7 @@ def test_processors_not_registered_when_azure_monitor_disabled( @patch("microsoft.opentelemetry._distro._setup_logging") @patch("microsoft.opentelemetry._distro._setup_metrics") @patch("microsoft.opentelemetry._distro._setup_tracing") - def test_processors_not_registered_for_a365_only( - self, tracing_mock, metrics_mock, logging_mock, a365_mock - ): + def test_processors_not_registered_for_a365_only(self, tracing_mock, metrics_mock, logging_mock, a365_mock): from microsoft.opentelemetry._genai.main_agent import ( GenAIMainAgentLogRecordProcessor, GenAIMainAgentSpanProcessor, From 54a44c43a300186dfdabb914fde672f25c80c786 Mon Sep 17 00:00:00 2001 From: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Date: Tue, 12 May 2026 13:43:19 -0700 Subject: [PATCH 2/2] Addressing comments --- .../_genai/_openai_agents/_trace_processor.py | 15 +++++-- .../_genai/_openai_agents/_utils.py | 43 +++++++++++++------ .../openai_agents/test_trace_instrumentor.py | 11 ----- tests/openai_agents/test_trace_processor.py | 4 +- tests/openai_agents/test_utils.py | 12 +++--- 5 files changed, 49 insertions(+), 36 deletions(-) diff --git a/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_processor.py b/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_processor.py index ab1a91c4..8076eb25 100644 --- a/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_processor.py +++ b/src/microsoft/opentelemetry/_genai/_openai_agents/_trace_processor.py @@ -99,7 +99,7 @@ def __init__(self, tracer: Tracer) -> None: self._agent_span_ids: OrderedDict[str, None] = OrderedDict() # Track parent-child relationships: child_span_id -> parent_span_id self._span_parents: OrderedDict[str, str] = OrderedDict() - # Track tool_call_ids from GenerationSpan: (function_name, trace_id) -> call_id + # Track tool_call_ids from GenerationSpan: "{trace_id}:{function_name}:{args}" -> call_id # Use an OrderedDict and _MAX_PENDING_TOOL_CALLS to cap the size of the dict # in case tool calls are captured but never consumed self._pending_tool_calls: OrderedDict[str, str] = OrderedDict() @@ -125,6 +125,11 @@ def on_trace_start(self, trace: Trace) -> None: Args: trace: The trace that started. """ + root_span = self._tracer.start_span(name="Agent workflow") + self._root_spans[trace.trace_id] = root_span + self._cap_ordered_dict(self._root_spans, self._MAX_TRACKED_SPANS) + self._tokens[trace.trace_id] = attach(set_span_in_context(root_span)) + self._cap_ordered_dict(self._tokens, self._MAX_TRACKED_SPANS) def on_trace_end(self, trace: Trace) -> None: """Called when a trace is finished. @@ -132,6 +137,8 @@ def on_trace_end(self, trace: Trace) -> None: Args: trace: The trace that started. """ + if token := self._tokens.pop(trace.trace_id, None): + detach(token) # type: ignore[arg-type] if root_span := self._root_spans.pop(trace.trace_id, None): root_span.set_status(Status(StatusCode.OK)) root_span.end() @@ -215,7 +222,9 @@ def on_span_end(self, span: Span[Any]) -> None: # pylint: disable=too-many-stat self._cap_ordered_dict(self._agent_outputs, self._MAX_TRACKED_SPANS) # Capture tool_call_ids for later use by FunctionSpan if data.output: - capture_tool_call_ids(data.output, self._pending_tool_calls, self._MAX_PENDING_TOOL_CALLS) + capture_tool_call_ids( + data.output, self._pending_tool_calls, self._MAX_PENDING_TOOL_CALLS, span.trace_id + ) attrs = otel_span.attributes or {} # type: ignore[attr-defined] otel_span.update_name(f"{attrs[GEN_AI_OPERATION_NAME_KEY]} {attrs[GEN_AI_REQUEST_MODEL_KEY]}") elif isinstance(data, FunctionSpanData): @@ -225,7 +234,7 @@ def on_span_end(self, span: Span[Any]) -> None: # pylint: disable=too-many-stat otel_span.set_attribute(GEN_AI_TOOL_TYPE_KEY, data.type) # Set tool_call_id if available from preceding GenerationSpan func_args = data.input if data.input else "" - if tool_call_id := get_tool_call_id(data.name, func_args, self._pending_tool_calls): + if tool_call_id := get_tool_call_id(data.name, func_args, self._pending_tool_calls, span.trace_id): otel_span.set_attribute(GEN_AI_TOOL_CALL_ID_KEY, tool_call_id) otel_span.update_name(f"{EXECUTE_TOOL_OPERATION_NAME} {data.name}") elif isinstance(data, MCPListToolsSpanData): diff --git a/src/microsoft/opentelemetry/_genai/_openai_agents/_utils.py b/src/microsoft/opentelemetry/_genai/_openai_agents/_utils.py index c92fc8cb..e2bbeab1 100644 --- a/src/microsoft/opentelemetry/_genai/_openai_agents/_utils.py +++ b/src/microsoft/opentelemetry/_genai/_openai_agents/_utils.py @@ -311,7 +311,7 @@ def _get_attributes_from_chat_completions_message_content_item( prefix: str = "", ) -> Iterator[tuple[str, AttributeValue]]: if obj.get("type") == "text" and (text := obj.get("text")): - yield f"{prefix}{GEN_AI_OUTPUT_MESSAGES_KEY}", text + yield f"{prefix}{GEN_AI_MESSAGE_CONTENT}", text def _get_attributes_from_chat_completions_tool_call_dict( @@ -365,15 +365,15 @@ def get_attributes_from_message_content_list( ) -> Iterator[tuple[str, AttributeValue]]: for i, item in enumerate(obj): if item["type"] == "input_text" or item["type"] == "output_text": - yield f"{prefix}{GEN_AI_INPUT_MESSAGES_KEY}.{i}.{GEN_AI_MESSAGE_CONTENT_TYPE}", "text" + yield f"{prefix}{GEN_AI_MESSAGE_CONTENTS}.{i}.{GEN_AI_MESSAGE_CONTENT_TYPE}", "text" yield ( - f"{prefix}{GEN_AI_INPUT_MESSAGES_KEY}.{i}.{GEN_AI_OUTPUT_MESSAGES_KEY}", + f"{prefix}{GEN_AI_MESSAGE_CONTENTS}.{i}.{GEN_AI_MESSAGE_CONTENT}", item["text"], ) elif item["type"] == "refusal": - yield f"{prefix}{GEN_AI_INPUT_MESSAGES_KEY}.{i}.{GEN_AI_MESSAGE_CONTENT_TYPE}", "text" + yield f"{prefix}{GEN_AI_MESSAGE_CONTENTS}.{i}.{GEN_AI_MESSAGE_CONTENT_TYPE}", "text" yield ( - f"{prefix}{GEN_AI_INPUT_MESSAGES_KEY}.{i}.{GEN_AI_OUTPUT_MESSAGES_KEY}", + f"{prefix}{GEN_AI_MESSAGE_CONTENTS}.{i}.{GEN_AI_MESSAGE_CONTENT}", item["refusal"], ) elif TYPE_CHECKING: @@ -432,16 +432,20 @@ def get_attributes_from_response_output( yield from _get_attributes_from_message(item, prefix) msg_idx += 1 elif item.type == "function_call": - yield f"{GEN_AI_OUTPUT_MESSAGES_KEY}.{msg_idx}.{GEN_AI_MESSAGE_ROLE}", "assistant" - prefix = f"{GEN_AI_OUTPUT_MESSAGES_KEY}.{msg_idx}.{GEN_AI_MESSAGE_TOOL_CALLS}.{tool_call_idx}." - yield from _get_attributes_from_function_tool_call(item, prefix) + prefix = f"{GEN_AI_OUTPUT_MESSAGES_KEY}.{msg_idx}." + yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", "assistant" + tc_prefix = f"{prefix}{GEN_AI_MESSAGE_TOOL_CALLS}.{tool_call_idx}." + yield from _get_attributes_from_function_tool_call(item, tc_prefix) tool_call_idx += 1 + msg_idx += 1 elif item.type == "custom_tool_call": + prefix = f"{GEN_AI_OUTPUT_MESSAGES_KEY}.{msg_idx}." yield f"{prefix}{GEN_AI_MESSAGE_ROLE}", "assistant" yield from _get_attributes_from_response_custom_tool_call( item, f"{prefix}{GEN_AI_MESSAGE_TOOL_CALLS}.0.", ) + msg_idx += 1 elif TYPE_CHECKING: assert_never(item) # type: ignore[arg-type] @@ -452,7 +456,7 @@ def _get_attributes_from_response_instruction( if not instructions: return yield f"{GEN_AI_INPUT_MESSAGES_KEY}.0.{GEN_AI_MESSAGE_ROLE}", "system" - yield f"{GEN_AI_INPUT_MESSAGES_KEY}.0.{GEN_AI_OUTPUT_MESSAGES_KEY}", instructions + yield f"{GEN_AI_INPUT_MESSAGES_KEY}.0.{GEN_AI_MESSAGE_CONTENT}", instructions def _get_attributes_from_function_tool_call( @@ -523,13 +527,19 @@ def get_span_status(obj: Span[Any]) -> Status: return Status(StatusCode.OK) -def capture_tool_call_ids(output_list: Any, pending_tool_calls: dict[str, str], max_size: int = 1000) -> None: +def capture_tool_call_ids( + output_list: Any, + pending_tool_calls: dict[str, str], + max_size: int = 1000, + trace_id: str = "", +) -> None: """Extract and store tool_call_ids from generation output for later use by FunctionSpan. Args: output_list: The generation output containing tool calls pending_tool_calls: OrderedDict to store pending tool calls max_size: Maximum number of pending tool calls to keep in memory + trace_id: Trace ID to scope tool calls and avoid cross-trace collisions """ if not output_list: return @@ -545,8 +555,8 @@ def capture_tool_call_ids(output_list: Any, pending_tool_calls: dict[str, str], func_name = func.get("name") if isinstance(func, dict) else None func_args = func.get("arguments", "") if isinstance(func, dict) else "" if call_id and func_name: - # Key by (function_name, arguments) to uniquely identify each call - key = f"{func_name}:{func_args}" + # Key by (trace_id, function_name, arguments) to avoid cross-trace collisions + key = f"{trace_id}:{func_name}:{func_args}" pending_tool_calls[key] = call_id # Cap the size of the dict to prevent unbounded growth while len(pending_tool_calls) > max_size: @@ -555,9 +565,14 @@ def capture_tool_call_ids(output_list: Any, pending_tool_calls: dict[str, str], pass -def get_tool_call_id(function_name: str, function_args: str, pending_tool_calls: dict[str, str]) -> str | None: +def get_tool_call_id( + function_name: str, + function_args: str, + pending_tool_calls: dict[str, str], + trace_id: str = "", +) -> str | None: """Get and remove the tool_call_id for a function with specific arguments.""" - key = f"{function_name}:{function_args}" + key = f"{trace_id}:{function_name}:{function_args}" return pending_tool_calls.pop(key, None) diff --git a/tests/openai_agents/test_trace_instrumentor.py b/tests/openai_agents/test_trace_instrumentor.py index e22a0526..a304eac1 100644 --- a/tests/openai_agents/test_trace_instrumentor.py +++ b/tests/openai_agents/test_trace_instrumentor.py @@ -41,18 +41,7 @@ def test_instrument_creates_processor(self, mock_trace_api): mock_tracer = MagicMock() mock_trace_api.get_tracer.return_value = mock_tracer - # Mock the agents.tracing module - mock_provider = MagicMock() - mock_multi = MagicMock() - mock_multi._processors = [] - mock_provider._multi_processor = mock_multi - instrumentor = A365OpenAIAgentsInstrumentor() - with patch( - "microsoft.opentelemetry._genai._openai_agents._trace_instrumentor.A365OpenAIAgentsInstrumentor._instrument" - ): - # Call directly - pass # Test the actual _instrument logic with patch.dict("sys.modules", {"agents.tracing": MagicMock()}): diff --git a/tests/openai_agents/test_trace_processor.py b/tests/openai_agents/test_trace_processor.py index 8622e749..3b32ebbe 100644 --- a/tests/openai_agents/test_trace_processor.py +++ b/tests/openai_agents/test_trace_processor.py @@ -183,8 +183,8 @@ def test_on_span_end_function_span_sets_tool_call_id(self, mock_detach): otel_span = _make_otel_span() self.processor._otel_spans["func-1"] = otel_span self.processor._tokens["func-1"] = MagicMock() - # Pre-populate pending tool calls - self.processor._pending_tool_calls['add:{"a":1}'] = "call_abc" + # Pre-populate pending tool calls (keyed with trace_id) + self.processor._pending_tool_calls['trace-1:add:{"a":1}'] = "call_abc" self.processor.on_span_end(span) diff --git a/tests/openai_agents/test_utils.py b/tests/openai_agents/test_utils.py index 55d97554..95a523ad 100644 --- a/tests/openai_agents/test_utils.py +++ b/tests/openai_agents/test_utils.py @@ -133,8 +133,8 @@ def test_captures_tool_call_ids(self): } ] pending = OrderedDict() - capture_tool_call_ids(output, pending) - self.assertEqual(pending['add:{"a":1}'], "call_123") + capture_tool_call_ids(output, pending, trace_id="trace-1") + self.assertEqual(pending['trace-1:add:{"a":1}'], "call_123") def test_caps_size(self): pending = OrderedDict() @@ -150,7 +150,7 @@ def test_caps_size(self): ], } ] - capture_tool_call_ids(output, pending, max_size=10) + capture_tool_call_ids(output, pending, max_size=10, trace_id="trace-1") self.assertLessEqual(len(pending), 10) def test_empty_output(self): @@ -163,10 +163,10 @@ def test_empty_output(self): class TestGetToolCallId(TestCase): def test_pops_matching_entry(self): pending = OrderedDict() - pending['add:{"a":1}'] = "call_abc" - result = get_tool_call_id("add", '{"a":1}', pending) + pending['trace-1:add:{"a":1}'] = "call_abc" + result = get_tool_call_id("add", '{"a":1}', pending, trace_id="trace-1") self.assertEqual(result, "call_abc") - self.assertNotIn('add:{"a":1}', pending) + self.assertNotIn('trace-1:add:{"a":1}', pending) def test_returns_none_for_missing(self): result = get_tool_call_id("unknown", "", OrderedDict())