From 800de9e21a9fb6a855b0b0475e0134d801c57ba1 Mon Sep 17 00:00:00 2001 From: RandithaK Date: Fri, 21 Nov 2025 01:45:46 +0530 Subject: [PATCH 1/5] feat: enhance agent capabilities with new tools for appointment, vehicle, and project management --- services/agent_core.py | 20 ++- services/agent_tools.py | 261 ++++++++++++++++++++++++++------ services/microservice_client.py | 109 ++++++++++++- services/token_context.py | 4 + test_new_tools.py | 94 ++++++++++++ 5 files changed, 434 insertions(+), 54 deletions(-) create mode 100644 services/token_context.py create mode 100644 test_new_tools.py diff --git a/services/agent_core.py b/services/agent_core.py index 023077c..390e939 100644 --- a/services/agent_core.py +++ b/services/agent_core.py @@ -7,6 +7,7 @@ from services.agent_tools import all_tools from services.microservice_client import MicroserviceClient from services.rag import get_rag_service +from services.token_context import token_context import logging from typing import List, Dict, Any @@ -31,10 +32,16 @@ def __init__(self): "Your mission is to help customers with their vehicle service needs in a warm, helpful manner.\n" "\n**YOUR CAPABILITIES:**\n" "- Answer questions about vehicle services, repairs, maintenance, and appointments\n" - "- Help schedule and manage service appointments\n" - "- Check service status and work logs for customers' vehicles\n" - "- Provide information about company policies, hours, and pricing\n" + "- Help schedule and manage service appointments (Book, Cancel, Check Slots)\n" + "- Manage user vehicles (List, Register, View Details)\n" + "- Handle custom modification projects (Request, List, View Details)\n" + "- Manage user profile (View, Update)\n" + "- Check service status and work logs\n" "- Give automotive advice and recommendations\n" + "\n**CRITICAL INSTRUCTION: CHAIN OF THOUGHT REASONING**\n" + "Before taking action, you MUST think step-by-step. For example:\n" + "- If user wants to book: 1. Check if they have a vehicle (`get_my_vehicles`). 2. If no vehicle, ask to register (`register_vehicle`). 3. If vehicle exists, check slots (`check_appointment_slots`). 4. Finally, book (`book_appointment`).\n" + "- If user wants to request a project: 1. Check vehicle. 2. Request project (`request_modification_project`).\n" "\n**CONVERSATION STYLE:**\n" "- Be friendly, warm, patient, and professional\n" "- Use emojis to make conversations more engaging and user-friendly (👋 🚗 ✅ 🔧 ⏰ 💰 😊 👍 🎉 etc.)\n" @@ -135,10 +142,9 @@ async def invoke_agent( "tool_executed": None } - # 3. CRITICAL: Inject Runtime Token into Tools Module - # Set the module-level runtime_token variable in agent_tools - import services.agent_tools as agent_tools_module - agent_tools_module.runtime_token = user_token + # 3. CRITICAL: Inject Runtime Token into ContextVar + # This ensures thread-safety for concurrent users + token_context.set(user_token) # 4. Invoke Agent Executor (use ainvoke for async tools) result = await self.agent_executor.ainvoke({ diff --git a/services/agent_tools.py b/services/agent_tools.py index 8093bff..5c9a803 100644 --- a/services/agent_tools.py +++ b/services/agent_tools.py @@ -1,90 +1,259 @@ from langchain.tools import StructuredTool -from typing import Dict, Any, List -from .microservice_client import get_microservice_client # FIX: Imported getter function +from typing import Dict, Any, List, Optional +from .microservice_client import get_microservice_client +from services.token_context import token_context import json -# Global variable to hold the token for the duration of the agent's run -runtime_token = "" - -# FIX: Get the singleton client instance immediately for use in tools +# Get the singleton client instance client = get_microservice_client() +# --- 1. Appointment Tools --- + async def check_appointment_slots_tool(date: str, service_type: str) -> str: """ - Checks the available appointment slots for a given date (YYYY-MM-DD) - and service_type (e.g., 'Oil Change', 'Diagnostics'). - Use this tool ONLY when the user asks for available times or scheduling. + Checks available appointment slots. + Use when user asks for available times. """ - # FIX: Uses the ASYNC method on the client instance with the runtime_token - result = await client.get_appointment_slots(date, service_type, runtime_token) + token = token_context.get() + result = await client.get_appointment_slots(date, service_type, token) if result.get("error"): - return f"Error: Could not check slots due to service error: {result['error']}" + return f"Error checking slots: {result['error']}" slots = result.get("available_slots", []) - if slots and isinstance(slots, list): slot_times = [s['time'] for s in slots if 'time' in s] if slot_times: - return f"Available slots on {date} for {service_type}: {', '.join(slot_times)}. Ask the user to specify a time if they want to book." + return f"Available slots on {date} for {service_type}: {', '.join(slot_times)}." return f"No available slots found on {date} for {service_type}." +async def book_appointment_tool(date: str, time: str, service_type: str, vehicle_id: str, description: str = "") -> str: + """ + Books a new appointment. + REQUIRED: date (YYYY-MM-DD), time (HH:mm), service_type, vehicle_id. + """ + token = token_context.get() + payload = { + "date": date, + "time": time, + "serviceType": service_type, + "vehicleId": vehicle_id, + "description": description + } + result = await client.book_appointment(payload, token) + + if result.get("error"): + return f"Failed to book appointment: {result['error']}" + + return f"Appointment booked successfully! ID: {result.get('id')}. Status: {result.get('status')}." + +async def cancel_appointment_tool(appointment_id: str) -> str: + """ + Cancels an existing appointment. + REQUIRED: appointment_id. + """ + token = token_context.get() + result = await client.cancel_appointment(appointment_id, token) + + if result.get("error"): + return f"Failed to cancel appointment: {result['error']}" + + return "Appointment cancelled successfully." + +# --- 2. Vehicle Tools --- + +async def get_my_vehicles_tool() -> str: + """ + Lists all vehicles belonging to the current user. + Use to find vehicle_id for booking. + """ + token = token_context.get() + vehicles = await client.get_customer_vehicles(token) + + if not vehicles: + return "You have no registered vehicles." + + summary = "Your Vehicles:\n" + for v in vehicles: + summary += f"- {v.get('make')} {v.get('model')} ({v.get('year')}) - Plate: {v.get('licensePlate')} - ID: {v.get('id')}\n" + return summary + +async def get_vehicle_details_tool(vehicle_id: str) -> str: + """ + Get detailed info for a specific vehicle. + """ + token = token_context.get() + result = await client.get_vehicle_details(vehicle_id, token) + + if result.get("error"): + return f"Error fetching vehicle details: {result['error']}" + + return json.dumps(result, indent=2) + +async def register_vehicle_tool(make: str, model: str, year: int, license_plate: str, vin: str) -> str: + """ + Registers a new vehicle. + REQUIRED: make, model, year, license_plate, vin. + """ + token = token_context.get() + payload = { + "make": make, + "model": model, + "year": year, + "licensePlate": license_plate, + "vin": vin + } + result = await client.register_vehicle(payload, token) + + if result.get("error"): + return f"Failed to register vehicle: {result['error']}" + + return f"Vehicle registered successfully! ID: {result.get('vehicleId')}" + +# --- 3. Project Tools --- + +async def request_modification_project_tool(vehicle_id: str, description: str, budget: float, desired_completion_date: str) -> str: + """ + Requests a new custom modification project. + REQUIRED: vehicle_id, description, budget, desired_completion_date (YYYY-MM-DD). + """ + token = token_context.get() + payload = { + "vehicleId": vehicle_id, + "description": description, + "budget": budget, + "desiredCompletionDate": desired_completion_date, + "projectType": "MODIFICATION" + } + result = await client.request_modification_project(payload, token) + + if result.get("error"): + return f"Failed to request project: {result['error']}" + + data = result.get('data', {}) + return f"Project requested successfully! ID: {data.get('id')}. Status: {data.get('status')}." + +async def get_my_projects_tool() -> str: + """ + Lists all custom projects for the user. + """ + token = token_context.get() + projects = await client.get_customer_projects(token) + + if not projects: + return "You have no active projects." + + summary = "Your Projects:\n" + for p in projects: + summary += f"- Project ID: {p.get('id')} - Status: {p.get('status')} - {p.get('description')[:50]}...\n" + return summary + +async def get_project_details_tool(project_id: str) -> str: + """ + Get details for a specific project. + """ + token = token_context.get() + result = await client.get_project_details(project_id, token) + + if result.get("error"): + return f"Error fetching project details: {result['error']}" + + return json.dumps(result, indent=2) + +# --- 4. Profile Tools --- + +async def get_my_profile_tool() -> str: + """ + Get current user profile details. + """ + token = token_context.get() + result = await client.get_my_profile(token) + + if result.get("error"): + return f"Error fetching profile: {result['error']}" + + return json.dumps(result, indent=2) + +async def update_my_profile_tool(full_name: str = None, phone: str = None, address: str = None) -> str: + """ + Update user profile. + Only provide fields that need updating. + """ + token = token_context.get() + payload = {} + if full_name: payload['fullName'] = full_name + if phone: payload['phone'] = phone + if address: payload['address'] = address + + if not payload: + return "No changes provided." + + result = await client.update_my_profile(payload, token) + + if result.get("error"): + return f"Failed to update profile: {result['error']}" + + return "Profile updated successfully." + +# --- 5. Existing Tools (Updated) --- + async def get_user_active_services_tool() -> str: """ - Retrieves a list of all IN_PROGRESS services and projects for the current user. - Use this tool when the user asks for the status of their vehicle or project. + Retrieves IN_PROGRESS services/projects. """ - # FIX: Uses the ASYNC method on the client instance with the runtime_token - active_items = await client.get_active_services(runtime_token) + token = token_context.get() + active_items = await client.get_active_services(token) if not active_items: - return "The user currently has no active services or modification projects." + return "No active services or projects." - summary = "The user has the following items IN_PROGRESS:\n" + summary = "Active Items:\n" for item in active_items: summary += f"- {item['type'].capitalize()} ID: {item['id']} (Status: {item['status']})\n" - - - return summary.strip() + return summary async def get_last_work_log_tool(service_id: str) -> str: """ - Retrieves the most recent time log and technician note for a specific service or project ID. - The service_id must be provided by the user or extracted from the conversation history. + Retrieves recent work logs for a service/project. """ - # FIX: Uses the ASYNC method on the client instance with the runtime_token - logs = await client.get_time_logs_for_service(service_id, runtime_token) + token = token_context.get() + logs = await client.get_time_logs_for_service(service_id, token) if logs and isinstance(logs, list): - # Sort by creation timestamp (assuming 'createdAt' is the key) logs.sort(key=lambda x: x.get('createdAt', ''), reverse=True) most_recent_log = logs[0] - return json.dumps({ "date": most_recent_log.get('date'), "hours": most_recent_log.get('hours'), "description": most_recent_log.get('description', 'No note provided.'), }) + return f"No logs found for ID: {service_id}." - return f"No time logs found for service/project ID: {service_id}." +# --- Tool Definitions --- -# Create StructuredTool instances for async functions all_tools = [ - StructuredTool.from_function( - coroutine=check_appointment_slots_tool, - name="check_appointment_slots_tool", - description="Checks the available appointment slots for a given date (YYYY-MM-DD) and service_type (e.g., 'Oil Change', 'Diagnostics'). Use this tool ONLY when the user asks for available times or scheduling." - ), - StructuredTool.from_function( - coroutine=get_user_active_services_tool, - name="get_user_active_services_tool", - description="Retrieves a list of all IN_PROGRESS services and projects for the current user. Use this tool when the user asks for the status of their vehicle or project." - ), - StructuredTool.from_function( - coroutine=get_last_work_log_tool, - name="get_last_work_log_tool", - description="Retrieves the most recent time log and technician note for a specific service or project ID. The service_id must be provided by the user or extracted from the conversation history." - ) + # Appointments + StructuredTool.from_function(coroutine=check_appointment_slots_tool, name="check_appointment_slots", description="Check available slots. Args: date (YYYY-MM-DD), service_type"), + StructuredTool.from_function(coroutine=book_appointment_tool, name="book_appointment", description="Book appointment. Args: date, time, service_type, vehicle_id, description"), + StructuredTool.from_function(coroutine=cancel_appointment_tool, name="cancel_appointment", description="Cancel appointment. Args: appointment_id"), + + # Vehicles + StructuredTool.from_function(coroutine=get_my_vehicles_tool, name="get_my_vehicles", description="List user vehicles. Returns IDs needed for booking."), + StructuredTool.from_function(coroutine=get_vehicle_details_tool, name="get_vehicle_details", description="Get vehicle details. Args: vehicle_id"), + StructuredTool.from_function(coroutine=register_vehicle_tool, name="register_vehicle", description="Register new vehicle. Args: make, model, year, license_plate, vin"), + + # Projects + StructuredTool.from_function(coroutine=request_modification_project_tool, name="request_modification_project", description="Request custom project. Args: vehicle_id, description, budget, desired_completion_date"), + StructuredTool.from_function(coroutine=get_my_projects_tool, name="get_my_projects", description="List user projects."), + StructuredTool.from_function(coroutine=get_project_details_tool, name="get_project_details", description="Get project details. Args: project_id"), + + # Profile + StructuredTool.from_function(coroutine=get_my_profile_tool, name="get_my_profile", description="Get user profile info."), + StructuredTool.from_function(coroutine=update_my_profile_tool, name="update_my_profile", description="Update profile. Args: full_name, phone, address"), + + # Existing / Status + StructuredTool.from_function(coroutine=get_user_active_services_tool, name="get_active_services", description="Check active services/projects status."), + StructuredTool.from_function(coroutine=get_last_work_log_tool, name="get_work_logs", description="Get work logs for service_id.") ] \ No newline at end of file diff --git a/services/microservice_client.py b/services/microservice_client.py index a330a01..8c13ec0 100644 --- a/services/microservice_client.py +++ b/services/microservice_client.py @@ -45,6 +45,51 @@ async def _make_get_request(self, url: str, token: str, params: Dict[str, Any] = logger.error(f"Unexpected Error from {url}: {e}") return {"error": str(e), "status_code": 500} + async def _make_post_request(self, url: str, token: str, data: Dict[str, Any] = None) -> Dict[str, Any]: + """Internal helper for making async authenticated POST requests.""" + headers = {"Authorization": f"Bearer {token}"} + try: + response = await self._async_client.post(url, json=data, headers=headers) + if response.is_success: + return response.json() + try: + return response.json() + except: + return {"error": f"HTTP Error {response.status_code}", "status_code": response.status_code} + except Exception as e: + logger.error(f"POST Error to {url}: {e}") + return {"error": str(e), "status_code": 500} + + async def _make_put_request(self, url: str, token: str, data: Dict[str, Any] = None) -> Dict[str, Any]: + """Internal helper for making async authenticated PUT requests.""" + headers = {"Authorization": f"Bearer {token}"} + try: + response = await self._async_client.put(url, json=data, headers=headers) + if response.is_success: + return response.json() + try: + return response.json() + except: + return {"error": f"HTTP Error {response.status_code}", "status_code": response.status_code} + except Exception as e: + logger.error(f"PUT Error to {url}: {e}") + return {"error": str(e), "status_code": 500} + + async def _make_delete_request(self, url: str, token: str) -> Dict[str, Any]: + """Internal helper for making async authenticated DELETE requests.""" + headers = {"Authorization": f"Bearer {token}"} + try: + response = await self._async_client.delete(url, headers=headers) + if response.is_success: + return {"success": True, "status_code": response.status_code} + try: + return response.json() + except: + return {"error": f"HTTP Error {response.status_code}", "status_code": response.status_code} + except Exception as e: + logger.error(f"DELETE Error to {url}: {e}") + return {"error": str(e), "status_code": 500} + # --- Methods used by Agent Core (Called from async context) --- async def get_user_context(self, token: str) -> UserContext: @@ -104,7 +149,6 @@ async def get_appointment_slots(self, date: str, service_type: str, token: str) url = f"{self.appointment_url}/availability" params = {"date": date, "serviceType": service_type} data = await self._make_get_request(url, token, params) - # Assuming the service returns the data directly or returns a dict with 'available_slots' key return data @staticmethod @@ -129,6 +173,69 @@ async def get_time_logs_for_service(self, service_id: str, token: str) -> List[D return self._parse_logs_response(data) + # --- New Methods for Enhanced Agent Capabilities --- + + # 1. Appointments + async def book_appointment(self, appointment_data: Dict[str, Any], token: str) -> Dict[str, Any]: + """Books a new appointment.""" + return await self._make_post_request(self.appointment_url, token, appointment_data) + + async def cancel_appointment(self, appointment_id: str, token: str) -> Dict[str, Any]: + """Cancels an appointment.""" + url = f"{self.appointment_url}/{appointment_id}" + return await self._make_delete_request(url, token) + + # 2. Vehicles + async def get_customer_vehicles(self, token: str) -> List[Dict[str, Any]]: + """Get all vehicles for the current user.""" + result = await self._make_get_request(self.vehicle_url, token) + if isinstance(result, list): + return result + return [] + + async def get_vehicle_details(self, vehicle_id: str, token: str) -> Dict[str, Any]: + """Get details for a specific vehicle.""" + url = f"{self.vehicle_url}/{vehicle_id}" + return await self._make_get_request(url, token) + + async def register_vehicle(self, vehicle_data: Dict[str, Any], token: str) -> Dict[str, Any]: + """Register a new vehicle.""" + return await self._make_post_request(self.vehicle_url, token, vehicle_data) + + # 3. Projects + async def request_modification_project(self, project_data: Dict[str, Any], token: str) -> Dict[str, Any]: + """Request a new custom modification project.""" + url = f"{self.project_url}/projects" + return await self._make_post_request(url, token, project_data) + + async def get_customer_projects(self, token: str) -> List[Dict[str, Any]]: + """Get all projects for the current user.""" + url = f"{self.project_url}/projects" + result = await self._make_get_request(url, token) + # API returns ApiResponse with 'data' field + if isinstance(result, dict) and 'data' in result and isinstance(result['data'], list): + return result['data'] + return [] + + async def get_project_details(self, project_id: str, token: str) -> Dict[str, Any]: + """Get details for a specific project.""" + url = f"{self.project_url}/projects/{project_id}" + result = await self._make_get_request(url, token) + if isinstance(result, dict) and 'data' in result: + return result['data'] + return result + + # 4. Profile + async def get_my_profile(self, token: str) -> Dict[str, Any]: + """Get current user profile.""" + url = f"{self.auth_url}/users/me" + return await self._make_get_request(url, token) + + async def update_my_profile(self, profile_data: Dict[str, Any], token: str) -> Dict[str, Any]: + """Update current user profile.""" + url = f"{self.auth_url}/users/profile" + return await self._make_put_request(url, token, profile_data) + # Singleton instance _microservice_client_instance = None diff --git a/services/token_context.py b/services/token_context.py new file mode 100644 index 0000000..237747a --- /dev/null +++ b/services/token_context.py @@ -0,0 +1,4 @@ +from contextvars import ContextVar + +# ContextVar to hold the user token for the current request context +token_context: ContextVar[str] = ContextVar("token_context", default="") diff --git a/test_new_tools.py b/test_new_tools.py new file mode 100644 index 0000000..3ccae21 --- /dev/null +++ b/test_new_tools.py @@ -0,0 +1,94 @@ +import pytest +import asyncio +from unittest.mock import AsyncMock, patch, MagicMock +from services.agent_tools import ( + book_appointment_tool, + cancel_appointment_tool, + get_my_vehicles_tool, + request_modification_project_tool, + check_appointment_slots_tool +) +from services.token_context import token_context + +@pytest.mark.asyncio +async def test_book_appointment_tool(): + # Mock the client + with patch('services.agent_tools.client') as mock_client: + # Set up the mock return value + mock_client.book_appointment = AsyncMock(return_value={"id": "123", "status": "CONFIRMED"}) + + # Set the token context + token = "test_token_123" + token_context.set(token) + + # Call the tool + result = await book_appointment_tool( + date="2023-12-25", + time="10:00", + service_type="Oil Change", + vehicle_id="v1", + description="Test booking" + ) + + # Verify the result + assert "Appointment booked successfully" in result + assert "ID: 123" in result + + # Verify the client was called with the correct token and data + mock_client.book_appointment.assert_called_once() + args, _ = mock_client.book_appointment.call_args + payload = args[0] + passed_token = args[1] + + assert passed_token == token + assert payload["date"] == "2023-12-25" + assert payload["vehicleId"] == "v1" + +@pytest.mark.asyncio +async def test_get_my_vehicles_tool(): + with patch('services.agent_tools.client') as mock_client: + mock_client.get_customer_vehicles = AsyncMock(return_value=[ + {"id": "v1", "make": "Toyota", "model": "Camry", "year": 2020, "licensePlate": "ABC-123"} + ]) + + token_context.set("user_token") + + result = await get_my_vehicles_tool() + + assert "Toyota Camry" in result + assert "ABC-123" in result + mock_client.get_customer_vehicles.assert_called_once_with("user_token") + +@pytest.mark.asyncio +async def test_concurrency_context(): + """Verify that token_context works correctly in simulated concurrent calls.""" + + with patch('services.agent_tools.client') as mock_client: + mock_client.get_appointment_slots = AsyncMock(return_value={"available_slots": [{"time": "10:00"}]}) + + async def user_action(user_token): + token_context.set(user_token) + # Simulate some async work + await asyncio.sleep(0.01) + # Call a tool + await check_appointment_slots_tool("2023-12-25", "Service") + # Verify the mock was called with THIS user's token + # (We can't easily check the mock call args here for concurrency, + # but we can check if the tool ran without error and used the context) + return token_context.get() + + # Run two "users" concurrently + results = await asyncio.gather( + user_action("token_A"), + user_action("token_B") + ) + + assert results[0] == "token_A" + assert results[1] == "token_B" + +if __name__ == "__main__": + # Manual run if pytest not available + asyncio.run(test_book_appointment_tool()) + asyncio.run(test_get_my_vehicles_tool()) + asyncio.run(test_concurrency_context()) + print("All manual tests passed!") From 3f0c6554ae79cdcaee75a63aff57dffc10c09e96 Mon Sep 17 00:00:00 2001 From: RandithaK Date: Fri, 21 Nov 2025 12:07:07 +0530 Subject: [PATCH 2/5] docs: clarify k3s section in QUICK_REFERENCE.md with testing note --- QUICK_REFERENCE.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/QUICK_REFERENCE.md b/QUICK_REFERENCE.md index 7466b61..6406a68 100644 --- a/QUICK_REFERENCE.md +++ b/QUICK_REFERENCE.md @@ -39,9 +39,9 @@ docker run -p 8091:8091 --env-file .env agent-bot:latest docker-compose up agent-bot ``` -### Kubernetes +### k3s (Kubernetes - lightweight) ```bash -# Apply configurations (from k8s-config repo) +# Apply configurations (from k8s-config repo) — tested on k3s kubectl apply -f k8s/configmaps/agent-bot-configmap.yaml kubectl apply -f k8s/services/agent-bot-deployment.yaml From 5750781e25e0e910fcbd7360d78bc4cbb382c923 Mon Sep 17 00:00:00 2001 From: RandithaK Date: Sun, 23 Nov 2025 11:59:58 +0530 Subject: [PATCH 3/5] feat: Add JWT parsing to propagate user context via X-User headers and improve microservice endpoint path resolution. --- requirements.txt | 1 + services/microservice_client.py | 103 ++++++++++++++++++++++++++++---- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9f4683e..0376bbc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -77,6 +77,7 @@ pinecone-client==3.0.0 propcache==0.4.1 proto-plus==1.26.1 protobuf==4.25.8 +PyJWT==2.8.0 pyasn1==0.6.1 pyasn1-modules==0.4.2 pydantic==2.12.4 diff --git a/services/microservice_client.py b/services/microservice_client.py index 8c13ec0..b389f02 100644 --- a/services/microservice_client.py +++ b/services/microservice_client.py @@ -2,7 +2,8 @@ import os import logging import asyncio -from typing import List, Dict, Any, Optional +import jwt +from typing import List, Dict, Any, Optional, Tuple from config.settings import settings from models.chat import UserContext, VehicleInfo @@ -27,11 +28,44 @@ def __init__(self): self.appointment_url = settings.APPOINTMENT_SERVICE_URL self.time_log_url = settings.TIME_LOGGING_SERVICE_URL + def _extract_user_from_token(self, token: str) -> Tuple[str, str]: + """ + Extract username and roles from JWT token. + Returns (username, roles_csv_string) + """ + try: + # Decode without verification (we trust our own tokens) + payload = jwt.decode(token, options={"verify_signature": False}) + username = payload.get("sub", "") + + # Extract roles - they might be in different formats + roles = payload.get("roles", []) + if isinstance(roles, list): + # Remove ROLE_ prefix if present + cleaned_roles = [r.replace("ROLE_", "") for r in roles] + roles_str = ",".join(cleaned_roles) + elif isinstance(roles, str): + roles_str = roles.replace("ROLE_", "") + else: + roles_str = "" + + logger.debug(f"Extracted from JWT - username: {username}, roles: {roles_str}") + return username, roles_str + except Exception as e: + logger.warning(f"Failed to extract user from token: {e}") + return "", "" + async def _make_get_request(self, url: str, token: str, params: Dict[str, Any] = None) -> Dict[str, Any]: """Internal helper for making async authenticated GET requests.""" headers = {"Authorization": f"Bearer {token}"} + + # Add X-User headers for direct service calls + username, roles = self._extract_user_from_token(token) + if username: + headers["X-User-Subject"] = username + headers["X-User-Roles"] = roles + try: - # FIX: Use async client and await response = await self._async_client.get(url, params=params, headers=headers) response.raise_for_status() return response.json() @@ -48,6 +82,13 @@ async def _make_get_request(self, url: str, token: str, params: Dict[str, Any] = async def _make_post_request(self, url: str, token: str, data: Dict[str, Any] = None) -> Dict[str, Any]: """Internal helper for making async authenticated POST requests.""" headers = {"Authorization": f"Bearer {token}"} + + # Add X-User headers for direct service calls + username, roles = self._extract_user_from_token(token) + if username: + headers["X-User-Subject"] = username + headers["X-User-Roles"] = roles + try: response = await self._async_client.post(url, json=data, headers=headers) if response.is_success: @@ -63,6 +104,13 @@ async def _make_post_request(self, url: str, token: str, data: Dict[str, Any] = async def _make_put_request(self, url: str, token: str, data: Dict[str, Any] = None) -> Dict[str, Any]: """Internal helper for making async authenticated PUT requests.""" headers = {"Authorization": f"Bearer {token}"} + + # Add X-User headers for direct service calls + username, roles = self._extract_user_from_token(token) + if username: + headers["X-User-Subject"] = username + headers["X-User-Roles"] = roles + try: response = await self._async_client.put(url, json=data, headers=headers) if response.is_success: @@ -78,6 +126,13 @@ async def _make_put_request(self, url: str, token: str, data: Dict[str, Any] = N async def _make_delete_request(self, url: str, token: str) -> Dict[str, Any]: """Internal helper for making async authenticated DELETE requests.""" headers = {"Authorization": f"Bearer {token}"} + + # Add X-User headers for direct service calls + username, roles = self._extract_user_from_token(token) + if username: + headers["X-User-Subject"] = username + headers["X-User-Roles"] = roles + try: response = await self._async_client.delete(url, headers=headers) if response.is_success: @@ -99,13 +154,23 @@ async def get_user_context(self, token: str) -> UserContext: async def _async_get_user_context(self, token: str) -> UserContext: """Retrieves user profile and vehicles (ASYNC helper).""" - # 1. Get User Profile (/auth/me endpoint) - user_data = await self._make_get_request(f"{self.auth_url}/me", token) + # 1. Get User Profile (/users/me endpoint) + base_url = self.auth_url.rstrip('/') + if base_url.endswith('/users'): + url = f"{base_url}/me" + else: + url = f"{base_url}/users/me" + + user_data = await self._make_get_request(url, token) if "error" in user_data: return UserContext(user_id="anonymous", full_name="Guest", role="PUBLIC", vehicles=[]) # 2. Get User Vehicles (/vehicles endpoint) - vehicle_data = await self._make_get_request(f"{self.vehicle_url}", token) + url = self.vehicle_url.rstrip('/') + if not url.endswith("/vehicles"): + url = f"{url}/vehicles" + vehicle_data = await self._make_get_request(url, token) + vehicles = [] if isinstance(vehicle_data, list): vehicles = [ @@ -188,19 +253,29 @@ async def cancel_appointment(self, appointment_id: str, token: str) -> Dict[str, # 2. Vehicles async def get_customer_vehicles(self, token: str) -> List[Dict[str, Any]]: """Get all vehicles for the current user.""" - result = await self._make_get_request(self.vehicle_url, token) + url = self.vehicle_url.rstrip('/') + if not url.endswith("/vehicles"): + url = f"{url}/vehicles" + result = await self._make_get_request(url, token) if isinstance(result, list): return result return [] async def get_vehicle_details(self, vehicle_id: str, token: str) -> Dict[str, Any]: """Get details for a specific vehicle.""" - url = f"{self.vehicle_url}/{vehicle_id}" + base_url = self.vehicle_url.rstrip('/') + if base_url.endswith("/vehicles"): + url = f"{base_url}/{vehicle_id}" + else: + url = f"{base_url}/vehicles/{vehicle_id}" return await self._make_get_request(url, token) async def register_vehicle(self, vehicle_data: Dict[str, Any], token: str) -> Dict[str, Any]: """Register a new vehicle.""" - return await self._make_post_request(self.vehicle_url, token, vehicle_data) + url = self.vehicle_url.rstrip('/') + if not url.endswith("/vehicles"): + url = f"{url}/vehicles" + return await self._make_post_request(url, token, vehicle_data) # 3. Projects async def request_modification_project(self, project_data: Dict[str, Any], token: str) -> Dict[str, Any]: @@ -228,12 +303,20 @@ async def get_project_details(self, project_id: str, token: str) -> Dict[str, An # 4. Profile async def get_my_profile(self, token: str) -> Dict[str, Any]: """Get current user profile.""" - url = f"{self.auth_url}/users/me" + base_url = self.auth_url.rstrip('/') + if base_url.endswith('/users'): + url = f"{base_url}/me" + else: + url = f"{base_url}/users/me" return await self._make_get_request(url, token) async def update_my_profile(self, profile_data: Dict[str, Any], token: str) -> Dict[str, Any]: """Update current user profile.""" - url = f"{self.auth_url}/users/profile" + base_url = self.auth_url.rstrip('/') + if base_url.endswith('/users'): + url = f"{base_url}/profile" + else: + url = f"{base_url}/users/profile" return await self._make_put_request(url, token, profile_data) From 7683a0d8e621171bb6529ddaa2f1b8e71acd5066 Mon Sep 17 00:00:00 2001 From: RandithaK Date: Sun, 23 Nov 2025 12:43:37 +0530 Subject: [PATCH 4/5] fix: Implement defensive URL trimming and enhance HTTP error response parsing in the microservice client. --- services/microservice_client.py | 45 ++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/services/microservice_client.py b/services/microservice_client.py index b389f02..1e06c87 100644 --- a/services/microservice_client.py +++ b/services/microservice_client.py @@ -20,9 +20,10 @@ class MicroserviceClient: def __init__(self): # Initialize an AsyncClient once per instance self._async_client = httpx.AsyncClient(timeout=5.0) - self.auth_url = settings.AUTHENTICATION_SERVICE_URL - self.vehicle_url = settings.VEHICLE_SERVICE_URL - self.project_url = settings.PROJECT_SERVICE_URL + # Normalize and sanitize URLs (strip whitespace and trailing slashes as needed) + self.auth_url = (settings.AUTHENTICATION_SERVICE_URL or "").strip() + self.vehicle_url = (settings.VEHICLE_SERVICE_URL or "").strip() + self.project_url = (settings.PROJECT_SERVICE_URL or "").strip() # FIX: Added required microservice URLs self.appointment_url = settings.APPOINTMENT_SERVICE_URL @@ -65,13 +66,33 @@ async def _make_get_request(self, url: str, token: str, params: Dict[str, Any] = headers["X-User-Subject"] = username headers["X-User-Roles"] = roles + # defensive trimming - remove accidental spaces + url = (url or "").strip() + logger.debug(f"Making GET request to: {url} params={params}") try: response = await self._async_client.get(url, params=params, headers=headers) response.raise_for_status() return response.json() except httpx.HTTPStatusError as errh: - logger.error(f"HTTP Error {errh.response.status_code} from {url}: {errh.response.text}") - return {"error": f"HTTP Error {errh.response.status_code}", "status_code": errh.response.status_code} + # Detailed error body may be helpful for callers - attempt to parse JSON + status = errh.response.status_code + body = None + try: + body = errh.response.json() + except Exception: + body = errh.response.text or None + + logger.error(f"HTTP Error {status} from {url}: {body}") + + # Return underlying error body if available, but keep a consistent shape + result = {"status_code": status} + if isinstance(body, dict): + # merge error body and preserve status_code + result.update(body) + else: + result["error"] = body or f"HTTP Error {status}" + + return result except httpx.RequestError as errc: logger.error(f"Request Error to {url}: {errc}") return {"error": "Microservice Unreachable", "status_code": 503} @@ -155,7 +176,7 @@ async def _async_get_user_context(self, token: str) -> UserContext: """Retrieves user profile and vehicles (ASYNC helper).""" # 1. Get User Profile (/users/me endpoint) - base_url = self.auth_url.rstrip('/') + base_url = self.auth_url.strip().rstrip('/') if base_url.endswith('/users'): url = f"{base_url}/me" else: @@ -166,7 +187,7 @@ async def _async_get_user_context(self, token: str) -> UserContext: return UserContext(user_id="anonymous", full_name="Guest", role="PUBLIC", vehicles=[]) # 2. Get User Vehicles (/vehicles endpoint) - url = self.vehicle_url.rstrip('/') + url = self.vehicle_url.strip().rstrip('/') if not url.endswith("/vehicles"): url = f"{url}/vehicles" vehicle_data = await self._make_get_request(url, token) @@ -253,7 +274,7 @@ async def cancel_appointment(self, appointment_id: str, token: str) -> Dict[str, # 2. Vehicles async def get_customer_vehicles(self, token: str) -> List[Dict[str, Any]]: """Get all vehicles for the current user.""" - url = self.vehicle_url.rstrip('/') + url = self.vehicle_url.strip().rstrip('/') if not url.endswith("/vehicles"): url = f"{url}/vehicles" result = await self._make_get_request(url, token) @@ -263,7 +284,7 @@ async def get_customer_vehicles(self, token: str) -> List[Dict[str, Any]]: async def get_vehicle_details(self, vehicle_id: str, token: str) -> Dict[str, Any]: """Get details for a specific vehicle.""" - base_url = self.vehicle_url.rstrip('/') + base_url = self.vehicle_url.strip().rstrip('/') if base_url.endswith("/vehicles"): url = f"{base_url}/{vehicle_id}" else: @@ -272,7 +293,7 @@ async def get_vehicle_details(self, vehicle_id: str, token: str) -> Dict[str, An async def register_vehicle(self, vehicle_data: Dict[str, Any], token: str) -> Dict[str, Any]: """Register a new vehicle.""" - url = self.vehicle_url.rstrip('/') + url = self.vehicle_url.strip().rstrip('/') if not url.endswith("/vehicles"): url = f"{url}/vehicles" return await self._make_post_request(url, token, vehicle_data) @@ -303,7 +324,7 @@ async def get_project_details(self, project_id: str, token: str) -> Dict[str, An # 4. Profile async def get_my_profile(self, token: str) -> Dict[str, Any]: """Get current user profile.""" - base_url = self.auth_url.rstrip('/') + base_url = self.auth_url.strip().rstrip('/') if base_url.endswith('/users'): url = f"{base_url}/me" else: @@ -312,7 +333,7 @@ async def get_my_profile(self, token: str) -> Dict[str, Any]: async def update_my_profile(self, profile_data: Dict[str, Any], token: str) -> Dict[str, Any]: """Update current user profile.""" - base_url = self.auth_url.rstrip('/') + base_url = self.auth_url.strip().rstrip('/') if base_url.endswith('/users'): url = f"{base_url}/profile" else: From 10149519eb224ec625641532f7606881ef924afc Mon Sep 17 00:00:00 2001 From: RandithaK Date: Sun, 23 Nov 2025 13:25:33 +0530 Subject: [PATCH 5/5] feat: Enhance agent invocation and vehicle retrieval with improved error handling and support for varying JSON structures --- services/agent_core.py | 74 +++++++++++++++++++++---- services/agent_tools.py | 19 +++++-- services/microservice_client.py | 2 +- test_agent_core_invocation.py | 96 +++++++++++++++++++++++++++++++++ test_new_tools.py | 20 +++++-- 5 files changed, 195 insertions(+), 16 deletions(-) create mode 100644 test_agent_core_invocation.py diff --git a/services/agent_core.py b/services/agent_core.py index 390e939..cd7abcd 100644 --- a/services/agent_core.py +++ b/services/agent_core.py @@ -2,6 +2,7 @@ from langchain.agents import AgentExecutor, initialize_agent, AgentType from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain_core.messages import HumanMessage, AIMessage from langchain_google_genai import ChatGoogleGenerativeAI from config.settings import settings from services.agent_tools import all_tools @@ -146,17 +147,72 @@ async def invoke_agent( # This ensures thread-safety for concurrent users token_context.set(user_token) - # 4. Invoke Agent Executor (use ainvoke for async tools) - result = await self.agent_executor.ainvoke({ - "input": user_query, - "chat_history": chat_history, - "user_context": user_context_str, # Injected into System Prompt - "rag_context": rag_context_str # Injected into System Prompt - }) + # 4. Convert chat history to LangChain message objects + # The MessagesPlaceholder expects HumanMessage and AIMessage objects, not plain dicts + langchain_history = [] + for msg in chat_history: + role = msg.get("role", "").lower() + content = msg.get("content", "") + if role == "user" or role == "human": + langchain_history.append(HumanMessage(content=content)) + elif role == "assistant" or role == "ai": + langchain_history.append(AIMessage(content=content)) - # 5. Determine Tool Execution Status + # 5. Invoke Agent Executor (try async, fallback to sync if async is not available) + raw_result = None + try: + # some AgentExecutor versions expose an async method named `ainvoke` + if hasattr(self.agent_executor, "ainvoke"): + raw_result = await self.agent_executor.ainvoke({ + "input": user_query, + "chat_history": langchain_history, + "user_context": user_context_str, # Injected into System Prompt + "rag_context": rag_context_str # Injected into System Prompt + }) + else: + # Fallback: call the synchronous `run` in a thread if async method missing + logger.info("AgentExecutor does not expose `ainvoke`, using sync `run` in an executor as fallback") + import asyncio as _asyncio + raw_result = await _asyncio.to_thread( + self.agent_executor.run, + { + "input": user_query, + "chat_history": langchain_history, + "user_context": user_context_str, + "rag_context": rag_context_str + } + ) + + except Exception as ex: + # Log exception with stack trace for easier debugging and re-raise + logger.exception("AgentExecutor invocation failed") + raise + + # 6. Determine Tool Execution Status tool_executed = None - intermediate_steps = result.get('intermediate_steps', []) + + # Normalize raw_result into the expected structure + intermediate_steps = [] + result = {} + + try: + if isinstance(raw_result, dict): + # When agent returns a dict-like response + result = raw_result + intermediate_steps = result.get('intermediate_steps', []) or [] + elif isinstance(raw_result, tuple) and len(raw_result) >= 2: + # Common return shape when return_intermediate_steps=True -> (output, intermediate_steps) + result = {"output": raw_result[0], "intermediate_steps": raw_result[1]} + intermediate_steps = raw_result[1] or [] + elif isinstance(raw_result, str): + # Simple string output + result = {"output": raw_result} + else: + # Any other shape - convert to string for output + result = {"output": str(raw_result)} + except Exception: + logger.exception("Failed to normalize agent executor output; converting to string") + result = {"output": str(raw_result)} if intermediate_steps: # intermediate_steps is a list of tuples: (AgentAction, tool_output) diff --git a/services/agent_tools.py b/services/agent_tools.py index 5c9a803..07eeebc 100644 --- a/services/agent_tools.py +++ b/services/agent_tools.py @@ -4,8 +4,14 @@ from services.token_context import token_context import json -# Get the singleton client instance -client = get_microservice_client() +# Attempt to get the singleton client instance - make import resilient (tests may not have httpx installed) +try: + client = get_microservice_client() +except Exception as e: + # Tests will patch `services.agent_tools.client` as needed; avoid hard failures during import + import logging + logging.getLogger(__name__).warning("Microservice client not available at import time: %s", e) + client = None # --- 1. Appointment Tools --- @@ -76,7 +82,14 @@ async def get_my_vehicles_tool() -> str: summary = "Your Vehicles:\n" for v in vehicles: - summary += f"- {v.get('make')} {v.get('model')} ({v.get('year')}) - Plate: {v.get('licensePlate')} - ID: {v.get('id')}\n" + # tolerate different JSON shapes from services (camelCase vs snake_case) + make = v.get('make') or v.get('Make') or '' + model = v.get('model') or v.get('Model') or '' + year = v.get('year') or v.get('Year') or '' + plate = v.get('licensePlate') or v.get('license_plate') or v.get('plate') or '' + vid = v.get('vehicleId') or v.get('id') or v.get('vehicle_id') or '' + + summary += f"- {make} {model} ({year}) - Plate: {plate} - ID: {vid}\n" return summary async def get_vehicle_details_tool(vehicle_id: str) -> str: diff --git a/services/microservice_client.py b/services/microservice_client.py index 1e06c87..d18704c 100644 --- a/services/microservice_client.py +++ b/services/microservice_client.py @@ -204,7 +204,7 @@ async def _async_get_user_context(self, token: str) -> UserContext: ] return UserContext( - user_id=user_data.get("id") or user_data.get("userId", "unknown"), + user_id=str(user_data.get("id") or user_data.get("userId") or "unknown"), full_name=user_data.get("fullName") or user_data.get("username", "unknown"), role=user_data.get("role", "CUSTOMER"), vehicles=vehicles diff --git a/test_agent_core_invocation.py b/test_agent_core_invocation.py new file mode 100644 index 0000000..ace0a57 --- /dev/null +++ b/test_agent_core_invocation.py @@ -0,0 +1,96 @@ +import sys +import pytest +import asyncio +from unittest.mock import AsyncMock, MagicMock + +# Some CI/test environments do not have langchain and google packages installed. +# Patch sys.modules with lightweight mocks so the module imports successfully and +# tests can instantiate AIAgentService via object.__new__ (skipping __init__). +import types + +# Add module stubs so the service module imports succeed in test environments +if 'langchain' not in sys.modules: + langchain_mod = types.ModuleType('langchain') + sys.modules['langchain'] = langchain_mod + +if 'langchain.tools' not in sys.modules: + sys.modules['langchain.tools'] = types.ModuleType('langchain.tools') + +if 'langchain.agents' not in sys.modules: + sys.modules['langchain.agents'] = types.ModuleType('langchain.agents') + +if 'langchain_core.prompts' not in sys.modules: + sys.modules['langchain_core.prompts'] = types.ModuleType('langchain_core.prompts') + +if 'langchain_google_genai' not in sys.modules: + sys.modules['langchain_google_genai'] = types.ModuleType('langchain_google_genai') + +# Populate required names so imports succeed +langchain_agents = sys.modules.get('langchain.agents') +setattr(langchain_agents, 'AgentExecutor', type('AgentExecutor', (), {})) +setattr(langchain_agents, 'initialize_agent', lambda *a, **k: MagicMock()) +setattr(langchain_agents, 'AgentType', type('AgentType', (), {})) + +langchain_tools = sys.modules.get('langchain.tools') +class _StructuredTool: + @classmethod + def from_function(cls, *args, **kwargs): + return None +setattr(langchain_tools, 'StructuredTool', _StructuredTool) + +langchain_prompts = sys.modules.get('langchain_core.prompts') +setattr(langchain_prompts, 'ChatPromptTemplate', type('ChatPromptTemplate', (), {'from_messages': classmethod(lambda cls, x: None)})) +setattr(langchain_prompts, 'MessagesPlaceholder', type('MessagesPlaceholder', (), {})) + +setattr(sys.modules.get('langchain_google_genai'), 'ChatGoogleGenerativeAI', type('ChatGoogleGenerativeAI', (), {})) + +# Import the class directly +from services.agent_core import AIAgentService + + +@pytest.mark.asyncio +async def test_invoke_agent_falls_back_to_sync_run(): + # Create an instance without running __init__ to avoid creating LLMs + agent = object.__new__(AIAgentService) + + # Prepare a synchronous agent_executor (no `ainvoke`) whose run returns (output, intermediate_steps) + class SyncExecutor: + def run(self, payload): + return ("sync output", [("action1", "tool-output")]) + + agent.agent_executor = SyncExecutor() + + # Mock microservice client and rag service + agent.ms_client = MagicMock() + agent.ms_client.get_user_context = AsyncMock(return_value={"id": "user-x"}) + + rag = MagicMock() + rag.retrieve_and_format = MagicMock(return_value={"context": "kb", "num_sources": 0}) + agent.rag_service = rag + + # Call invoke_agent + result = await agent.invoke_agent("hello", "s1", "tok", []) + + assert isinstance(result, dict) + assert result.get("output") == "sync output" + + +@pytest.mark.asyncio +async def test_invoke_agent_uses_ainvoke_when_present(): + agent = object.__new__(AIAgentService) + + class AsyncExecutor: + async def ainvoke(self, payload): + return {"output": "async output", "intermediate_steps": []} + + agent.agent_executor = AsyncExecutor() + agent.ms_client = MagicMock() + agent.ms_client.get_user_context = AsyncMock(return_value={}) + + rag = MagicMock() + rag.retrieve_and_format = MagicMock(return_value={"context": "kb", "num_sources": 1}) + agent.rag_service = rag + + result = await agent.invoke_agent("ask me", "s2", "tok2", []) + + assert result.get("output") == "async output" diff --git a/test_new_tools.py b/test_new_tools.py index 3ccae21..79aa277 100644 --- a/test_new_tools.py +++ b/test_new_tools.py @@ -46,19 +46,33 @@ async def test_book_appointment_tool(): @pytest.mark.asyncio async def test_get_my_vehicles_tool(): + # Case A: service returns camelCase fields with patch('services.agent_tools.client') as mock_client: mock_client.get_customer_vehicles = AsyncMock(return_value=[ {"id": "v1", "make": "Toyota", "model": "Camry", "year": 2020, "licensePlate": "ABC-123"} ]) - + token_context.set("user_token") - + result = await get_my_vehicles_tool() - + assert "Toyota Camry" in result assert "ABC-123" in result mock_client.get_customer_vehicles.assert_called_once_with("user_token") + # Case B: service returns snake_case fields (some services may use snake_case) + with patch('services.agent_tools.client') as mock_client2: + mock_client2.get_customer_vehicles = AsyncMock(return_value=[ + {"vehicle_id": "v2", "make": "Honda", "model": "Civic", "year": 2018, "license_plate": "XYZ-789"} + ]) + + token_context.set("user_token") + result2 = await get_my_vehicles_tool() + + assert "Honda Civic" in result2 + assert "XYZ-789" in result2 + mock_client2.get_customer_vehicles.assert_called_once_with("user_token") + @pytest.mark.asyncio async def test_concurrency_context(): """Verify that token_context works correctly in simulated concurrent calls."""