From 5f519b72ba085ce3994c81fcbc35057a9fe29b25 Mon Sep 17 00:00:00 2001 From: Luca Perrozzi Date: Wed, 10 Sep 2025 07:50:56 +0000 Subject: [PATCH] feat: Add x-utcp-auth OpenAPI extension with auth inheritance Add support for the x-utcp-auth extension in OpenAPI specifications with seamless authentication inheritance from manual call templates. ## Features - Support x-utcp-auth extension that takes precedence over standard security schemes - Inherit authentication configuration from manual call templates - Case-insensitive header name comparison for auth inheritance compatibility - Safe handling of non-dict x-utcp-auth values - Fall back to placeholder generation when auth is incompatible - Maintain backward compatibility with existing OpenAPI security ## Usage Example Manual call template with auth: ```json { "manual_call_templates": [{ "name": "aws_api", "call_template_type": "http", "url": "https://api.example.com/openapi.json", "auth": { "auth_type": "api_key", "api_key": "Bearer token-123", "var_name": "Authorization", "location": "header" } }] } ``` OpenAPI operations with x-utcp-auth extension: ```json { "paths": { "/protected": { "get": { "operationId": "get_protected_data", "x-utcp-auth": { "auth_type": "api_key", "var_name": "Authorization", "location": "header" } } } } } ``` Generated tools automatically inherit the auth from manual call templates. --- .../utcp_http/http_communication_protocol.py | 2 +- .../http/src/utcp_http/openapi_converter.py | 34 +- .../http/tests/test_x_utcp_auth_extension.py | 361 ++++++++++++++++++ 3 files changed, 393 insertions(+), 4 deletions(-) create mode 100644 plugins/communication_protocols/http/tests/test_x_utcp_auth_extension.py diff --git a/plugins/communication_protocols/http/src/utcp_http/http_communication_protocol.py b/plugins/communication_protocols/http/src/utcp_http/http_communication_protocol.py index a62e4d3..b96d960 100644 --- a/plugins/communication_protocols/http/src/utcp_http/http_communication_protocol.py +++ b/plugins/communication_protocols/http/src/utcp_http/http_communication_protocol.py @@ -197,7 +197,7 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R utcp_manual = UtcpManualSerializer().validate_dict(response_data) else: logger.info(f"Assuming OpenAPI spec from '{manual_call_template.name}'. Converting to UTCP manual.") - converter = OpenApiConverter(response_data, spec_url=manual_call_template.url, call_template_name=manual_call_template.name) + converter = OpenApiConverter(response_data, spec_url=manual_call_template.url, call_template_name=manual_call_template.name, inherited_auth=manual_call_template.auth) utcp_manual = converter.convert() return RegisterManualResult( diff --git a/plugins/communication_protocols/http/src/utcp_http/openapi_converter.py b/plugins/communication_protocols/http/src/utcp_http/openapi_converter.py index be20fe6..cdbea30 100644 --- a/plugins/communication_protocols/http/src/utcp_http/openapi_converter.py +++ b/plugins/communication_protocols/http/src/utcp_http/openapi_converter.py @@ -87,7 +87,7 @@ class OpenApiConverter: call_template_name: Normalized name for the call_template derived from the spec. """ - def __init__(self, openapi_spec: Dict[str, Any], spec_url: Optional[str] = None, call_template_name: Optional[str] = None): + def __init__(self, openapi_spec: Dict[str, Any], spec_url: Optional[str] = None, call_template_name: Optional[str] = None, inherited_auth: Optional[Auth] = None): """Initializes the OpenAPI converter. Args: @@ -96,9 +96,12 @@ def __init__(self, openapi_spec: Dict[str, Any], spec_url: Optional[str] = None, Used for base URL determination if servers are not specified. call_template_name: Optional custom name for the call_template if the specification title is not provided. + inherited_auth: Optional auth configuration inherited from the manual call template. + Used instead of generating placeholders when x-utcp-auth is present. """ self.spec = openapi_spec self.spec_url = spec_url + self.inherited_auth = inherited_auth # Single counter for all placeholder variables self.placeholder_counter = 0 if call_template_name is None: @@ -161,6 +164,31 @@ def convert(self) -> UtcpManual: def _extract_auth(self, operation: Dict[str, Any]) -> Optional[Auth]: """ Extracts authentication information from OpenAPI operation and global security schemes.""" + + # First check for x-utcp-auth extension + if "x-utcp-auth" in operation: + utcp_auth = operation.get("x-utcp-auth") if isinstance(operation.get("x-utcp-auth"), dict) else {} + auth_type = utcp_auth.get("auth_type") + + if auth_type == "api_key": + # Use inherited auth if available and compatible, otherwise create placeholder + inherited_var_name = self.inherited_auth.var_name.lower() if self.inherited_auth and self.inherited_auth.var_name else "" + utcp_var_name = utcp_auth.get("var_name", "Authorization").lower() + if (self.inherited_auth and + isinstance(self.inherited_auth, ApiKeyAuth) and + inherited_var_name == utcp_var_name and + self.inherited_auth.location == utcp_auth.get("location", "header")): + return self.inherited_auth + else: + api_key_placeholder = self._get_placeholder("API_KEY") + self._increment_placeholder_counter() + return ApiKeyAuth( + api_key=api_key_placeholder, + var_name=utcp_auth.get("var_name", "Authorization"), + location=utcp_auth.get("location", "header") + ) + + # Then fall back to standard OpenAPI security schemes # First check for operation-level security requirements security_requirements = operation.get("security", []) @@ -168,9 +196,9 @@ def _extract_auth(self, operation: Dict[str, Any]) -> Optional[Auth]: if not security_requirements: security_requirements = self.spec.get("security", []) - # If no security requirements, return None + # If no security requirements, return inherited auth if available if not security_requirements: - return None + return self.inherited_auth # Get security schemes - support both OpenAPI 2.0 and 3.0 security_schemes = self._get_security_schemes() diff --git a/plugins/communication_protocols/http/tests/test_x_utcp_auth_extension.py b/plugins/communication_protocols/http/tests/test_x_utcp_auth_extension.py new file mode 100644 index 0000000..1a01421 --- /dev/null +++ b/plugins/communication_protocols/http/tests/test_x_utcp_auth_extension.py @@ -0,0 +1,361 @@ +"""Tests for x-utcp-auth OpenAPI extension support. + +This module tests the custom x-utcp-auth extension that allows OpenAPI specifications +to include UTCP-specific authentication configuration directly in operations. +""" + +import pytest +from utcp_http.openapi_converter import OpenApiConverter +from utcp_http.http_call_template import HttpCallTemplate +from utcp.data.auth_implementations.api_key_auth import ApiKeyAuth +from utcp.data.auth_implementations.basic_auth import BasicAuth +from utcp.data.auth_implementations.oauth2_auth import OAuth2Auth + + +def test_x_utcp_auth_api_key_extension(): + """Test that x-utcp-auth extension with API key auth is processed correctly.""" + openapi_spec = { + "openapi": "3.0.1", + "info": {"title": "Test API", "version": "1.0.0"}, + "servers": [{"url": "https://api.example.com"}], + "paths": { + "/protected": { + "get": { + "operationId": "get_protected_data", + "summary": "Get Protected Data", + "x-utcp-auth": { + "auth_type": "api_key", + "var_name": "Authorization", + "location": "header" + }, + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": {"type": "string"} + } + } + } + } + } + } + } + } + + converter = OpenApiConverter(openapi_spec) + manual = converter.convert() + + assert len(manual.tools) == 1 + tool = manual.tools[0] + + # Check that auth was extracted from x-utcp-auth extension + assert tool.tool_call_template.auth is not None + assert isinstance(tool.tool_call_template.auth, ApiKeyAuth) + assert tool.tool_call_template.auth.var_name == "Authorization" + assert tool.tool_call_template.auth.location == "header" + assert tool.tool_call_template.auth.api_key.startswith("${API_KEY_") + + +def test_x_utcp_auth_takes_precedence_over_standard_security(): + """Test that x-utcp-auth extension takes precedence over standard OpenAPI security.""" + openapi_spec = { + "openapi": "3.0.1", + "info": {"title": "Test API", "version": "1.0.0"}, + "servers": [{"url": "https://api.example.com"}], + "components": { + "securitySchemes": { + "bearerAuth": { + "type": "http", + "scheme": "bearer" + } + } + }, + "paths": { + "/protected": { + "get": { + "operationId": "get_protected_data", + "summary": "Get Protected Data", + "security": [{"bearerAuth": []}], + "x-utcp-auth": { + "auth_type": "api_key", + "var_name": "X-API-Key", + "location": "header" + }, + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": {"type": "string"} + } + } + } + } + } + } + } + } + + converter = OpenApiConverter(openapi_spec) + manual = converter.convert() + + assert len(manual.tools) == 1 + tool = manual.tools[0] + + # Should use x-utcp-auth, not the standard security scheme + assert tool.tool_call_template.auth is not None + assert isinstance(tool.tool_call_template.auth, ApiKeyAuth) + assert tool.tool_call_template.auth.var_name == "X-API-Key" + assert tool.tool_call_template.auth.location == "header" + + +def test_fallback_to_standard_security_when_no_x_utcp_auth(): + """Test that standard OpenAPI security is used when x-utcp-auth is not present.""" + openapi_spec = { + "openapi": "3.0.1", + "info": {"title": "Test API", "version": "1.0.0"}, + "servers": [{"url": "https://api.example.com"}], + "components": { + "securitySchemes": { + "bearerAuth": { + "type": "http", + "scheme": "bearer" + } + } + }, + "paths": { + "/protected": { + "get": { + "operationId": "get_protected_data", + "summary": "Get Protected Data", + "security": [{"bearerAuth": []}], + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": {"type": "string"} + } + } + } + } + } + } + } + } + + converter = OpenApiConverter(openapi_spec) + manual = converter.convert() + + assert len(manual.tools) == 1 + tool = manual.tools[0] + + # Should use standard security scheme + assert tool.tool_call_template.auth is not None + assert isinstance(tool.tool_call_template.auth, ApiKeyAuth) + assert tool.tool_call_template.auth.var_name == "Authorization" + assert tool.tool_call_template.auth.location == "header" + assert tool.tool_call_template.auth.api_key.startswith("Bearer ${API_KEY_") + + +def test_mixed_operations_with_and_without_x_utcp_auth(): + """Test OpenAPI spec with mixed operations - some with x-utcp-auth, some without.""" + openapi_spec = { + "openapi": "3.0.1", + "info": {"title": "Test API", "version": "1.0.0"}, + "servers": [{"url": "https://api.example.com"}], + "paths": { + "/public": { + "get": { + "operationId": "get_public_data", + "summary": "Get Public Data", + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": {"type": "string"} + } + } + } + } + } + }, + "/protected": { + "get": { + "operationId": "get_protected_data", + "summary": "Get Protected Data", + "x-utcp-auth": { + "auth_type": "api_key", + "var_name": "Authorization", + "location": "header" + }, + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": {"type": "string"} + } + } + } + } + } + } + } + } + + converter = OpenApiConverter(openapi_spec) + manual = converter.convert() + + assert len(manual.tools) == 2 + + # Find tools by name + public_tool = next(t for t in manual.tools if t.name == "get_public_data") + protected_tool = next(t for t in manual.tools if t.name == "get_protected_data") + + # Public tool should have no auth + assert public_tool.tool_call_template.auth is None + + # Protected tool should have auth from x-utcp-auth + assert protected_tool.tool_call_template.auth is not None + assert isinstance(protected_tool.tool_call_template.auth, ApiKeyAuth) + assert protected_tool.tool_call_template.auth.var_name == "Authorization" + assert protected_tool.tool_call_template.auth.location == "header" + + +def test_auth_inheritance_from_manual_call_template(): + """Test that tools inherit authentication from manual call template.""" + openapi_spec = { + "openapi": "3.0.1", + "info": {"title": "Test API", "version": "1.0.0"}, + "servers": [{"url": "https://api.example.com"}], + "paths": { + "/data": { + "get": { + "operationId": "get_data", + "summary": "Get Data", + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": {"type": "string"} + } + } + } + } + } + } + } + } + + # Manual call template with auth + manual_auth = ApiKeyAuth( + api_key="Bearer token-123", + var_name="Authorization", + location="header" + ) + + converter = OpenApiConverter(openapi_spec, inherited_auth=manual_auth) + manual = converter.convert() + + assert len(manual.tools) == 1 + tool = manual.tools[0] + + # Tool should inherit auth from manual call template + assert tool.tool_call_template.auth is not None + assert isinstance(tool.tool_call_template.auth, ApiKeyAuth) + assert tool.tool_call_template.auth.api_key == "Bearer token-123" + assert tool.tool_call_template.auth.var_name == "Authorization" + assert tool.tool_call_template.auth.location == "header" + + +def test_x_utcp_auth_overrides_inherited_auth(): + """Test that x-utcp-auth extension overrides inherited auth from manual call template.""" + openapi_spec = { + "openapi": "3.0.1", + "info": {"title": "Test API", "version": "1.0.0"}, + "servers": [{"url": "https://api.example.com"}], + "paths": { + "/data": { + "get": { + "operationId": "get_data", + "summary": "Get Data", + "x-utcp-auth": { + "auth_type": "api_key", + "var_name": "X-Custom-Key", + "location": "header" + }, + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": {"type": "string"} + } + } + } + } + } + } + } + } + + # Manual call template with different auth + manual_auth = ApiKeyAuth( + api_key="Bearer token-123", + var_name="Authorization", + location="header" + ) + + converter = OpenApiConverter(openapi_spec, inherited_auth=manual_auth) + manual = converter.convert() + + assert len(manual.tools) == 1 + tool = manual.tools[0] + + # Tool should use x-utcp-auth, not inherited auth + assert tool.tool_call_template.auth is not None + assert isinstance(tool.tool_call_template.auth, ApiKeyAuth) + assert tool.tool_call_template.auth.var_name == "X-Custom-Key" + assert tool.tool_call_template.auth.location == "header" + # Should generate placeholder, not use inherited value + assert tool.tool_call_template.auth.api_key.startswith("${API_KEY_") + + +def test_no_auth_inheritance_when_manual_has_no_auth(): + """Test that no auth is applied when manual call template has no auth.""" + openapi_spec = { + "openapi": "3.0.1", + "info": {"title": "Test API", "version": "1.0.0"}, + "servers": [{"url": "https://api.example.com"}], + "paths": { + "/data": { + "get": { + "operationId": "get_data", + "summary": "Get Data", + "responses": { + "200": { + "description": "Success", + "content": { + "application/json": { + "schema": {"type": "string"} + } + } + } + } + } + } + } + } + + converter = OpenApiConverter(openapi_spec, inherited_auth=None) + manual = converter.convert() + + assert len(manual.tools) == 1 + tool = manual.tools[0] + + # Tool should have no auth + assert tool.tool_call_template.auth is None