diff --git a/runagent/client/client.py b/runagent/client/client.py index ce890a9..f0c1341 100644 --- a/runagent/client/client.py +++ b/runagent/client/client.py @@ -14,6 +14,9 @@ def __init__(self, agent_id: str, entrypoint_tag: str, local: bool = True, host: self.local = local self.agent_id = agent_id self.entrypoint_tag = entrypoint_tag + + # FIXED: Detect if this is a streaming entrypoint + self.is_streaming = entrypoint_tag.endswith("_stream") if local: if host and port: @@ -40,19 +43,16 @@ def __init__(self, agent_id: str, entrypoint_tag: str, local: bool = True, host: self.rest_client = RestClient(is_local=False) self.socket_client = SocketClient(is_local=False) - # self.agent_architecture = self.rest_client.get_agent_architecture(agent_id) - - # selected_entrypoint = next( - # ( - # entrypoint for entrypoint in self.agent_architecture['entrypoints'] - # if entrypoint['tag'] == entrypoint_tag - # ), None) - - # if not selected_entrypoint: - # raise ValueError(f"Entrypoint `{entrypoint_tag}` not found in agent {agent_id}") - def run(self, *input_args, **input_kwargs): - + """ + FIXED: Smart execution - automatically uses streaming or non-streaming based on entrypoint + """ + # FIXED: If this is a streaming entrypoint, automatically use run_stream + if self.is_streaming: + console.print(f"[cyan]Detected streaming entrypoint, using WebSocket streaming[/cyan]") + return self.run_stream(*input_args, **input_kwargs) + + # Non-streaming execution (HTTP POST) response = self.rest_client.run_agent( self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs ) @@ -80,6 +80,7 @@ def run(self, *input_args, **input_kwargs): def run_stream(self, *input_args, **input_kwargs): """Stream agent execution results in real-time via WebSocket""" try: + # FIXED: Return the generator directly, don't try to iterate here return self.socket_client.run_stream( self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs ) @@ -89,4 +90,4 @@ def run_stream(self, *input_args, **input_kwargs): def _run_stream(self, *input_args, **input_kwargs): """Legacy method - use run_stream instead""" - return self.run_stream(*input_args, **input_kwargs) + return self.run_stream(*input_args, **input_kwargs) \ No newline at end of file diff --git a/runagent/constants.py b/runagent/constants.py index 57f07c6..cc3ce1b 100644 --- a/runagent/constants.py +++ b/runagent/constants.py @@ -17,14 +17,14 @@ # Environment Variables ENV_RUNAGENT_API_KEY = "RUNAGENT_API_KEY" # UPDATED: Change default port to match your middleware (8333) -ENV_RUNAGENT_BASE_URL = "http://52.237.88.147:8333/" +ENV_RUNAGENT_BASE_URL = "http://20.84.81.110:8333/" ENV_LOCAL_CACHE_DIRECTORY = "RUNAGENT_CACHE_DIR" ENV_RUNAGENT_LOGGING_LEVEL = "RUNAGENT_LOGGING_LEVEL" # Local Configuration LOCAL_CACHE_DIRECTORY_PATH = "~/.runagent" USER_DATA_FILE_NAME = "user_data.json" -DEFAULT_BASE_URL = "http://52.237.88.147:8333/" +DEFAULT_BASE_URL = "http://20.84.81.110:8333/" AGENT_CONFIG_FILE_NAME = "runagent.config.json" # Rest of the file remains the same... diff --git a/runagent/sdk/deployment/middleware_sync.py b/runagent/sdk/deployment/middleware_sync.py index 8a44fdb..8010d65 100644 --- a/runagent/sdk/deployment/middleware_sync.py +++ b/runagent/sdk/deployment/middleware_sync.py @@ -54,17 +54,16 @@ def is_sync_enabled(self) -> bool: return getattr(self, 'sync_enabled', False) async def sync_agent_startup(self, agent_id: str, agent_data: Dict[str, Any]) -> bool: - """Sync agent data when local server starts - FIXED for simplified ID structure""" + """Sync agent data when local server starts""" if not self.is_sync_enabled(): console.print("[dim]Middleware sync disabled - agent will run in local-only mode[/dim]") return False try: - console.print(f"Syncing agent {agent_id} to middleware...") + console.print(f"[cyan]Syncing agent {agent_id[:8]}... to middleware...[/cyan]") - # FIXED: Use simplified structure - agent_id becomes the main ID sync_data = { - "local_agent_id": agent_id, # This becomes the main agent ID in middleware + "local_agent_id": agent_id, "name": agent_data.get("name", "Local Agent"), "framework": agent_data.get("framework", "unknown"), "version": agent_data.get("version", "1.0.0"), @@ -76,67 +75,110 @@ async def sync_agent_startup(self, agent_id: str, agent_data: Dict[str, Any]) -> "sync_source": "local_server" } - response = await self._make_async_request("POST", "/local-agents", sync_data) + console.print(f"[dim]Sending POST to /local-agents[/dim]") + console.print(f"[dim]Full URL: {self.rest_client.base_url}/local-agents[/dim]") - if response.get("success"): - console.print("Agent synced successfully to middleware") - return True - else: - console.print(f"Agent sync failed: {response.get('error', 'Unknown error')}") + # Make direct HTTP call instead of using _make_async_request + try: + response = self.rest_client.http.post("/local-agents", data=sync_data, timeout=30) + result = response.json() if hasattr(response, 'json') else response + + console.print(f"[cyan]Response status: {response.status_code if hasattr(response, 'status_code') else 'N/A'}[/cyan]") + console.print(f"[cyan]Response: {result}[/cyan]") + + if result.get("success"): + console.print("[green]✅ Agent synced successfully[/green]") + return True + else: + error_msg = result.get("error", "Unknown error") + console.print(f"[yellow]⚠️ Agent sync failed: {error_msg}[/yellow]") + return False + + except Exception as e: + console.print(f"[red]❌ HTTP request failed: {str(e)}[/red]") return False except Exception as e: - console.print(f"Agent sync error: {str(e)}") + console.print(f"[red]❌ Agent sync error: {str(e)}[/red]") + import traceback + console.print(f"[dim]{traceback.format_exc()}[/dim]") return False + async def sync_invocation_start(self, invocation_data: Dict[str, Any]) -> Optional[str]: - """Sync invocation start to middleware - FIXED for simplified ID structure""" + """Sync invocation start to middleware""" if not self.is_sync_enabled(): return None try: - # FIXED: Use simplified structure + console.print(f"[cyan]📡 Syncing invocation start...[/cyan]") + sync_payload = { - "agent_id": invocation_data.get("agent_id"), # Main agent ID (no separate local_agent_id) - "local_execution_id": invocation_data.get("local_execution_id"), # This becomes main execution ID + "agent_id": invocation_data.get("agent_id"), + "local_execution_id": invocation_data.get("local_execution_id"), "input_data": invocation_data.get("input_data", {}), "entrypoint_tag": invocation_data.get("entrypoint_tag", ""), "sdk_type": invocation_data.get("sdk_type", "local_server"), "client_info": invocation_data.get("client_info", {}) } - response = await self._make_async_request( - "POST", - "/local-agents/invocations", - sync_payload - ) + console.print(f"[dim]POST to /local-agents/invocations[/dim]") + + response = self.rest_client.http.post("/local-agents/invocations", data=sync_payload, timeout=30) + result = response.json() if hasattr(response, 'json') else response - if response.get("success"): - # Return the execution ID (which is now the main ID) - return response.get("id") + console.print(f"[cyan]Response: {result}[/cyan]") + + if result.get("success"): + execution_id = result.get("data", {}).get("id") + if execution_id: + console.print(f"[green]✅ Invocation synced: {execution_id[:8]}...[/green]") + return execution_id + + return None except Exception as e: - console.print(f"Invocation start sync error: {str(e)}") - - return None + console.print(f"[red]❌ Invocation sync error: {str(e)}[/red]") + return None + async def sync_invocation_complete(self, execution_id: str, completion_data: Dict[str, Any]) -> bool: - """Sync invocation completion to middleware - FIXED for simplified ID structure""" + """Sync invocation completion to middleware""" if not self.is_sync_enabled() or not execution_id: return False try: - # FIXED: Use main execution ID directly (no separate local_execution_id) - response = await self._make_async_request( - "PUT", - f"/local-agents/invocations/{execution_id}", # execution_id is now the main ID - completion_data - ) + console.print(f"[cyan]📡 Syncing completion: {execution_id[:8]}...[/cyan]") + + update_payload = {} + + if completion_data.get("output_data"): + update_payload["output_data"] = completion_data["output_data"] + + if completion_data.get("error_detail"): + update_payload["error_detail"] = completion_data["error_detail"] + + if completion_data.get("execution_time_ms"): + update_payload["execution_time_ms"] = completion_data["execution_time_ms"] + + if completion_data.get("status"): + update_payload["status"] = completion_data["status"] - return response.get("success", False) + console.print(f"[dim]PUT to /local-agents/invocations/{execution_id[:8]}...[/dim]") + + response = self.rest_client.http.put(f"/local-agents/invocations/{execution_id}", data=update_payload, timeout=30) + result = response.json() if hasattr(response, 'json') else response + + console.print(f"[cyan]Response: {result}[/cyan]") + + if result.get("success"): + console.print(f"[green]✅ Completion synced[/green]") + return True + + return False except Exception as e: - console.print(f"Invocation completion sync error: {str(e)}") + console.print(f"[red]❌ Completion sync error: {str(e)}[/red]") return False def get_sync_status(self) -> Dict[str, Any]: diff --git a/runagent/sdk/server/local_server.py b/runagent/sdk/server/local_server.py index 81e11a7..d6eac83 100644 --- a/runagent/sdk/server/local_server.py +++ b/runagent/sdk/server/local_server.py @@ -785,17 +785,20 @@ async def run_agent(request: AgentRunRequest): self.log_execution_start(invocation_id, request.entrypoint_tag) - # Sync invocation start to middleware - FIXED for simplified structure + # FIXED: Sync invocation start to middleware IMMEDIATELY after local creation middleware_invocation_id = None if (hasattr(self, 'middleware_sync') and self.middleware_sync and - self.middleware_sync.is_sync_enabled()): + self.middleware_sync.is_sync_enabled() and + getattr(self, 'agent_synced_to_middleware', False)): # Only if agent is synced try: - # FIXED: Use simplified invocation structure + console.print(f"📡 [cyan]Syncing invocation start to middleware...[/cyan]") + + # FIXED: Use correct structure matching middleware expectations sync_payload = { "agent_id": self.agent_id, # Main agent ID - "local_execution_id": invocation_id, # This becomes main execution ID in middleware + "local_execution_id": invocation_id, # This becomes main execution ID "input_data": { "input_args": request.input_args, "input_kwargs": request.input_kwargs @@ -811,8 +814,16 @@ async def run_agent(request: AgentRunRequest): } middleware_invocation_id = await self.middleware_sync.sync_invocation_start(sync_payload) + + if middleware_invocation_id: + console.print(f"✅ [green]Middleware invocation created: {middleware_invocation_id}[/green]") + else: + console.print(f"⚠️ [yellow]Middleware invocation sync returned None[/yellow]") + except Exception as e: - console.print(f"Middleware sync start failed: {e}") + console.print(f"❌ [red]Middleware sync start failed: {e}[/red]") + import traceback + traceback.print_exc() start_time = time.time() execution_success = False @@ -840,7 +851,7 @@ async def run_agent(request: AgentRunRequest): output_data=serializable_output, execution_time_ms=execution_time * 1000 ) - console.print("Local invocation tracking completed successfully") + console.print("✅ Local invocation tracking completed successfully") except Exception as e: console.print(f"Failed to complete local invocation tracking: {str(e)}") @@ -859,18 +870,30 @@ async def run_agent(request: AgentRunRequest): except Exception as e2: console.print(f"Critical: Could not complete local invocation tracking: {str(e2)}") - # Sync invocation completion to middleware - FIXED for simplified structure + # FIXED: Sync invocation completion to middleware with proper error handling if middleware_invocation_id: try: - await self.middleware_sync.sync_invocation_complete( - middleware_invocation_id, # This is now the main execution ID in middleware + console.print(f"📡 [cyan]Syncing completion to middleware...[/cyan]") + completion_result = await self.middleware_sync.sync_invocation_complete( + middleware_invocation_id, { "output_data": serializable_output, - "execution_time_ms": execution_time * 1000 + "execution_time_ms": execution_time * 1000, + "status": "completed" } ) + + if completion_result: + console.print(f"✅ [green]Middleware completion synced successfully[/green]") + else: + console.print(f"⚠️ [yellow]Middleware completion sync returned False[/yellow]") + except Exception as e: - console.print(f"Failed to sync completion to middleware: {e}") + console.print(f"❌ [red]Failed to sync completion to middleware: {e}[/red]") + import traceback + traceback.print_exc() + elif self.middleware_sync and self.middleware_sync.is_sync_enabled(): + console.print(f"⚠️ [yellow]No middleware invocation ID to update (sync may have failed at start)[/yellow]") # Record in original agent_runs table for backward compatibility try: @@ -898,8 +921,8 @@ async def run_agent(request: AgentRunRequest): execution_data = ExecutionData( execution_id=invocation_id, agent_id=self.agent_id, - user_id=None, # Not available in local mode - deployment_id=None, # Not available in local mode + user_id=None, + deployment_id=None, entrypoint_id=request.entrypoint_tag, status="completed", started_at=datetime.fromtimestamp(start_time).isoformat(), @@ -912,7 +935,7 @@ async def run_agent(request: AgentRunRequest): result_data={ "message": "", "data": result_str, - "vm_id": str(uuid.uuid4()), # Mock VM ID for local execution + "vm_id": str(uuid.uuid4()), "type": "result", "timestamp": datetime.now().isoformat() }, @@ -922,7 +945,8 @@ async def run_agent(request: AgentRunRequest): "deployment_id": None, "timeout_seconds": 60, "async_execution": False, - "execution_config": {} + "execution_config": {}, + "middleware_synced": middleware_invocation_id is not None }, error_message=None, is_local=True, @@ -963,18 +987,28 @@ async def run_agent(request: AgentRunRequest): execution_time_ms=execution_time * 1000 ) - # Sync invocation error to middleware - FIXED for simplified structure + # FIXED: Sync invocation error to middleware with proper error handling if middleware_invocation_id: try: - await self.middleware_sync.sync_invocation_complete( - middleware_invocation_id, # This is now the main execution ID in middleware + console.print(f"📡 [cyan]Syncing error to middleware...[/cyan]") + error_result = await self.middleware_sync.sync_invocation_complete( + middleware_invocation_id, { "error_detail": error_detail, - "execution_time_ms": execution_time * 1000 + "execution_time_ms": execution_time * 1000, + "status": "failed" } ) + + if error_result: + console.print(f"✅ [green]Middleware error synced[/green]") + else: + console.print(f"⚠️ [yellow]Middleware error sync returned False[/yellow]") + except Exception as sync_error: - console.print(f"Failed to sync error to middleware: {sync_error}") + console.print(f"❌ [red]Failed to sync error to middleware: {sync_error}[/red]") + import traceback + traceback.print_exc() # Record in original agent_runs table for backward compatibility try: @@ -1000,8 +1034,8 @@ async def run_agent(request: AgentRunRequest): execution_data = ExecutionData( execution_id=invocation_id, agent_id=self.agent_id, - user_id=None, # Not available in local mode - deployment_id=None, # Not available in local mode + user_id=None, + deployment_id=None, entrypoint_id=request.entrypoint_tag, status="failed", started_at=datetime.fromtimestamp(start_time).isoformat(), @@ -1018,7 +1052,8 @@ async def run_agent(request: AgentRunRequest): "deployment_id": None, "timeout_seconds": 60, "async_execution": False, - "execution_config": {} + "execution_config": {}, + "middleware_synced": middleware_invocation_id is not None }, error_message=error_detail, is_local=True, diff --git a/runagent/sdk/server/socket_utils.py b/runagent/sdk/server/socket_utils.py index 7cd15c0..f86ff35 100644 --- a/runagent/sdk/server/socket_utils.py +++ b/runagent/sdk/server/socket_utils.py @@ -14,7 +14,7 @@ class AgentWebSocketHandler: - """WebSocket handler for agent streaming - ENHANCED with invocation tracking""" + """WebSocket handler for agent streaming with invocation tracking""" def __init__(self, db_service, middleware_sync=None): self.db_service = db_service @@ -22,42 +22,59 @@ def __init__(self, db_service, middleware_sync=None): self.middleware_sync = middleware_sync or get_middleware_sync() self.active_streams = {} - async def handle_agent_stream(self, websocket: WebSocket, agent_id: str, agent_execution_streamer): - """ORIGINAL METHOD - Handle WebSocket connection for agent streaming (backward compatibility)""" - await websocket.accept() - connection_id = f"{agent_id}_{int(time.time())}" - - try: - console.print(f" WebSocket connected for agent: [cyan]{agent_id}[/cyan]") - - # Wait for client request - raw_message = await websocket.receive_text() - request_msg = self.serializer.deserialize_message(raw_message) - if request_msg.error: - await self._send_error(websocket, f"Invalid request: {request_msg.error}") - return - - # Parse WebSocket request + def _convert_chunk_to_serializable(self, chunk): + """ + Convert chunk to JSON-serializable format + Handles various object types including LlamaIndex objects + """ + # Already serializable types + if isinstance(chunk, (str, int, float, bool, type(None))): + return chunk + + # Lists and tuples + if isinstance(chunk, (list, tuple)): + return [self._convert_chunk_to_serializable(item) for item in chunk] + + # Dictionaries + if isinstance(chunk, dict): + return {k: self._convert_chunk_to_serializable(v) for k, v in chunk.items()} + + # Objects with dict() method (Pydantic v1) + if hasattr(chunk, 'dict') and callable(chunk.dict): try: - ws_request = WebSocketAgentRequest(**request_msg.data) - except Exception as e: - await self._send_error(websocket, f"Invalid request format: {str(e)}") - return - - if ws_request.action == WebSocketActionType.START_STREAM: - await self._handle_stream_start(websocket, ws_request, connection_id, agent_execution_streamer) - elif ws_request.action == WebSocketActionType.PING: - await self._send_pong(websocket) - else: - await self._send_error(websocket, f"Unknown action: {ws_request.action}") - - except WebSocketDisconnect: - console.print(f"WebSocket disconnected for agent: [cyan]{agent_id}[/cyan]") - self._cleanup_stream(connection_id) - except Exception as e: - console.print(f"💥 WebSocket error for agent {agent_id}: [red]{str(e)}[/red]") - await self._send_error(websocket, f"Server error: {str(e)}") - self._cleanup_stream(connection_id) + return chunk.dict() + except: + pass + + # Objects with model_dump() method (Pydantic v2) + if hasattr(chunk, 'model_dump') and callable(chunk.model_dump): + try: + return chunk.model_dump() + except: + pass + + # Objects with __dict__ + if hasattr(chunk, '__dict__'): + try: + return { + k: self._convert_chunk_to_serializable(v) + for k, v in chunk.__dict__.items() + if not k.startswith('_') + } + except: + pass + + # Try str() as fallback + try: + chunk_str = str(chunk) + # If str() produces something useful, return it + if chunk_str and chunk_str != f"<{type(chunk).__name__} object at 0x": + return chunk_str + except: + pass + + # Last resort: return type name + return {"type": type(chunk).__name__, "repr": repr(chunk)[:200]} async def handle_agent_stream_with_tracking( self, @@ -66,17 +83,16 @@ async def handle_agent_stream_with_tracking( entrypoint_runner_dict: dict, db_service ): - """Handle streaming execution - FIXED for middleware sync""" + """Handle streaming execution with proper chunk serialization""" await websocket.accept() invocation_id = None - middleware_invocation_id = None # FIXED: Add middleware invocation ID tracking + middleware_invocation_id = None try: - # Wait for start message - expect direct JSON format + # Wait for start message data = await websocket.receive_text() - # Parse the direct JSON request format try: request_data = json.loads(data) except json.JSONDecodeError as e: @@ -100,16 +116,14 @@ async def handle_agent_stream_with_tracking( entrypoint_tag = request_data["entrypoint_tag"] - # For WebSocket, we want to check if the entrypoint exists and is a streaming entrypoint. if entrypoint_tag not in entrypoint_runner_dict: await websocket.send_json({ "type": "error", "detail": f"Entrypoint {entrypoint_tag} not found" }) - await websocket.close(code=1003) # 1003 = unsupported data + await websocket.close(code=1003) return - # For WebSocket, we REQUIRE the entrypoint to be a streaming entrypoint if not entrypoint_tag.endswith("_stream"): await websocket.send_json({ "type": "error", @@ -120,13 +134,10 @@ async def handle_agent_stream_with_tracking( stream_runner = entrypoint_runner_dict[entrypoint_tag] - # Extract input data from the new format input_args = request_data.get("input_args", []) input_kwargs = request_data.get("input_kwargs", {}) - timeout_seconds = request_data.get("timeout_seconds", 60) - async_execution = request_data.get("async_execution", False) - # Start LOCAL invocation tracking first + # Start LOCAL invocation tracking invocation_id = self.db_service.start_invocation( agent_id=agent_id, input_data={ @@ -141,22 +152,19 @@ async def handle_agent_stream_with_tracking( } ) - console.print(f"🚀 Started invocation: Invocation ID = {invocation_id}") - console.print(f"🔍 Entrypoint: {entrypoint_tag}") - console.print(f"🔍 Input data: *{input_args}, **{input_kwargs}") + console.print(f"Started invocation: Invocation ID = {invocation_id}") - # FIXED: Sync invocation start to middleware - PROPERLY + # Sync invocation start to middleware if (hasattr(self, 'middleware_sync') and self.middleware_sync and self.middleware_sync.is_sync_enabled()): try: - console.print(f"📡 [cyan]Syncing invocation start to middleware...[/cyan]") + console.print(f"Syncing invocation start to middleware...") - # Prepare sync payload with CORRECT structure sync_payload = { - "agent_id": agent_id, # Main agent ID - "local_execution_id": invocation_id, # This becomes main execution ID in middleware + "agent_id": agent_id, + "local_execution_id": invocation_id, "input_data": { "input_args": input_args, "input_kwargs": input_kwargs @@ -166,22 +174,20 @@ async def handle_agent_stream_with_tracking( "client_info": { "connection_type": "websocket", "stream_mode": True, - "server_host": "127.0.0.1", # Add proper server info - "server_port": 8450 # Add proper port info + "server_host": "127.0.0.1", + "server_port": 8450 } } middleware_invocation_id = await self.middleware_sync.sync_invocation_start(sync_payload) if middleware_invocation_id: - console.print(f"✅ [green]Middleware invocation started: {middleware_invocation_id}[/green]") - else: - console.print(f"⚠️ [yellow]Middleware invocation start failed[/yellow]") + console.print(f"Middleware invocation started: {middleware_invocation_id}") except Exception as e: - console.print(f"❌ [red]Middleware sync start failed: {e}[/red]") + console.print(f"Middleware sync start failed: {e}") - # Send stream started status to client (simple JSON format) + # Send stream started status await websocket.send_json({ "type": "status", "status": "stream_started", @@ -198,39 +204,52 @@ async def handle_agent_stream_with_tracking( async for chunk in stream_runner(*input_args, **input_kwargs): chunk_count += 1 + # Convert chunk to serializable format + try: + serializable_chunk = self._convert_chunk_to_serializable(chunk) + except Exception as conv_error: + console.print(f"Warning: Could not convert chunk {chunk_count}: {conv_error}") + serializable_chunk = { + "chunk_number": chunk_count, + "conversion_error": str(conv_error), + "chunk_type": str(type(chunk)), + "chunk_str": str(chunk)[:200] + } + # Store chunk for final output tracking (limit to prevent memory issues) if chunk_count <= 5000: try: - serializable_chunk = self._convert_to_serializable(chunk) stream_output_data.append(serializable_chunk) except Exception as e: - console.print(f"⚠️ [yellow]Warning: Could not serialize chunk {chunk_count}: {e}[/yellow]") - stream_output_data.append({ - "chunk_number": chunk_count, - "serialization_error": str(e), - "chunk_type": str(type(chunk)), - "chunk_preview": str(chunk) - }) + console.print(f"Warning: Could not store chunk {chunk_count}: {e}") - # Send chunk to client (simple JSON format) - await websocket.send_text(json.dumps({ - "type": "data", - "content": chunk - })) + # Send chunk to client + try: + await websocket.send_text(json.dumps({ + "type": "data", + "content": serializable_chunk + })) + except Exception as send_error: + console.print(f"Error sending chunk {chunk_count}: {send_error}") + # Try sending error message + await websocket.send_json({ + "type": "error", + "error": f"Chunk serialization failed: {str(send_error)}" + }) # Streaming completed successfully execution_time = time.time() - start_time - console.print(f"✅ Completed invocation {invocation_id[:8]}... successfully") - console.print(f"📊 Total chunks: {chunk_count}, Execution time: {execution_time:.2f}s") + console.print(f"Completed invocation {invocation_id[:8]}... successfully") + console.print(f"Total chunks: {chunk_count}, Execution time: {execution_time:.2f}s") - # FIXED: Complete LOCAL invocation tracking with success + # Complete LOCAL invocation tracking try: final_output_data = { "stream_completed": True, "total_chunks": chunk_count, "execution_type": "streaming", - "sample_chunks": stream_output_data if stream_output_data else [], + "sample_chunks": stream_output_data[:10] if stream_output_data else [], "execution_time_seconds": execution_time, "chunk_summary": { "total_chunks": chunk_count, @@ -244,15 +263,15 @@ async def handle_agent_stream_with_tracking( output_data=final_output_data, execution_time_ms=execution_time * 1000 ) - console.print(f"✅ [green]Local invocation completed successfully[/green]") + console.print(f"Local invocation completed successfully") except Exception as e: - console.print(f"❌ [red]Failed to complete local invocation: {e}[/red]") + console.print(f"Failed to complete local invocation: {e}") - # FIXED: Sync completion to middleware - PROPERLY + # Sync completion to middleware if middleware_invocation_id and self.middleware_sync: try: - console.print(f"📡 [cyan]Syncing completion to middleware...[/cyan]") + console.print(f"Syncing completion to middleware...") completion_data = { "output_data": final_output_data, @@ -266,14 +285,14 @@ async def handle_agent_stream_with_tracking( ) if sync_success: - console.print(f"✅ [green]Middleware completion synced successfully[/green]") + console.print(f"Middleware completion synced successfully") else: - console.print(f"❌ [red]Middleware completion sync failed[/red]") + console.print(f"Middleware completion sync failed") except Exception as e: - console.print(f"❌ [red]Middleware sync completion error: {e}[/red]") + console.print(f"Middleware sync completion error: {e}") - # Send completion status to client (simple JSON format) + # Send completion status await websocket.send_json({ "type": "status", "status": "stream_completed", @@ -288,7 +307,7 @@ async def handle_agent_stream_with_tracking( execution_time = time.time() - start_time error_detail = f"Streaming error: {str(stream_error)}" - console.print(f"❌ [red]Stream execution failed: {error_detail}[/red]") + console.print(f"Stream execution failed: {error_detail}") # Complete LOCAL invocation with error self.db_service.complete_invocation( @@ -297,10 +316,10 @@ async def handle_agent_stream_with_tracking( execution_time_ms=execution_time * 1000 ) - # FIXED: Sync error to middleware - PROPERLY + # Sync error to middleware if middleware_invocation_id and self.middleware_sync: try: - console.print(f"📡 [cyan]Syncing error to middleware...[/cyan]") + console.print(f"Syncing error to middleware...") error_data = { "error_detail": error_detail, @@ -312,12 +331,12 @@ async def handle_agent_stream_with_tracking( middleware_invocation_id, error_data ) - console.print(f"✅ [green]Middleware error synced[/green]") + console.print(f"Middleware error synced") except Exception as sync_error: - console.print(f"❌ [red]Middleware sync error failed: {sync_error}[/red]") + console.print(f"Middleware sync error failed: {sync_error}") - # Send error to client (simple JSON format) + # Send error to client await websocket.send_text(json.dumps({ "type": "error", "error": error_detail @@ -326,14 +345,12 @@ async def handle_agent_stream_with_tracking( except WebSocketDisconnect: console.print(f"WebSocket disconnected for agent {agent_id}") if invocation_id: - # Complete local invocation as disconnected self.db_service.complete_invocation( invocation_id=invocation_id, error_detail="WebSocket disconnected", execution_time_ms=0 ) - # FIXED: Sync disconnection to middleware if middleware_invocation_id and self.middleware_sync: try: await self.middleware_sync.sync_invocation_complete( @@ -348,16 +365,14 @@ async def handle_agent_stream_with_tracking( console.print(f"Failed to sync disconnection: {e}") except Exception as e: - console.print(f"❌ WebSocket handler error: {e}") + console.print(f"WebSocket handler error: {e}") if invocation_id: - # Complete local invocation with error self.db_service.complete_invocation( invocation_id=invocation_id, error_detail=str(e), execution_time_ms=0 ) - # FIXED: Sync handler error to middleware if middleware_invocation_id and self.middleware_sync: try: await self.middleware_sync.sync_invocation_complete( @@ -371,7 +386,43 @@ async def handle_agent_stream_with_tracking( except Exception as sync_error: console.print(f"Failed to sync handler error: {sync_error}") - + async def handle_agent_stream(self, websocket: WebSocket, agent_id: str, agent_execution_streamer): + """ORIGINAL METHOD - Handle WebSocket connection for agent streaming (backward compatibility)""" + await websocket.accept() + connection_id = f"{agent_id}_{int(time.time())}" + + try: + console.print(f"WebSocket connected for agent: {agent_id}") + + # Wait for client request + raw_message = await websocket.receive_text() + request_msg = self.serializer.deserialize_message(raw_message) + if request_msg.error: + await self._send_error(websocket, f"Invalid request: {request_msg.error}") + return + + # Parse WebSocket request + try: + ws_request = WebSocketAgentRequest(**request_msg.data) + except Exception as e: + await self._send_error(websocket, f"Invalid request format: {str(e)}") + return + + if ws_request.action == WebSocketActionType.START_STREAM: + await self._handle_stream_start(websocket, ws_request, connection_id, agent_execution_streamer) + elif ws_request.action == WebSocketActionType.PING: + await self._send_pong(websocket) + else: + await self._send_error(websocket, f"Unknown action: {ws_request.action}") + + except WebSocketDisconnect: + console.print(f"WebSocket disconnected for agent: {agent_id}") + self._cleanup_stream(connection_id) + except Exception as e: + console.print(f"WebSocket error for agent {agent_id}: {str(e)}") + await self._send_error(websocket, f"Server error: {str(e)}") + self._cleanup_stream(connection_id) + async def _handle_stream_start(self, websocket: WebSocket, request: WebSocketAgentRequest, connection_id: str, agent_execution_streamer: Callable): """ORIGINAL METHOD - Handle stream start request (backward compatibility)""" start_time = time.time() @@ -384,9 +435,9 @@ async def _handle_stream_start(self, websocket: WebSocket, request: WebSocketAge "input_kwargs": list(request.input_data.input_kwargs.keys()) }) - console.print(f"Starting stream for agent: [cyan]{request.agent_id}[/cyan]") - console.print(f"Input args: [cyan]{request.input_data.input_args}[/cyan]") - console.print(f"Input kwargs: [cyan]{request.input_data.input_kwargs}[/cyan]") + console.print(f"Starting stream for agent: {request.agent_id}") + console.print(f"Input args: {request.input_data.input_args}") + console.print(f"Input kwargs: {request.input_data.input_kwargs}") # Track active stream self.active_streams[connection_id] = { @@ -440,7 +491,7 @@ async def _handle_stream_start(self, websocket: WebSocket, request: WebSocketAge ) console.print( - f"✅ Agent [cyan]{request.agent_id}[/cyan] stream completed successfully in " + f"Agent {request.agent_id} stream completed successfully in " f"{execution_time:.2f}s with {chunk_count} chunks" ) @@ -460,271 +511,10 @@ async def _handle_stream_start(self, websocket: WebSocket, request: WebSocketAge execution_time=execution_time, ) - console.print(f"💥 [red]{error_msg}[/red]") + console.print(f"{error_msg}") finally: self._cleanup_stream(connection_id) - - # async def _handle_stream_start_with_tracking( - # self, - # websocket: WebSocket, - # request: WebSocketAgentRequest, - # connection_id: str, - # agent_execution_streamer: Callable, - # invocation_id: str, - # db_service, - # middleware_invocation_id: str = None - # ): - # """Enhanced stream start with invocation tracking and middleware sync""" - # start_time = time.time() - # chunk_count = 0 - # stream_output_data = [] - # error_detail = None - - # try: - # # Send stream started status - # await self._send_status(websocket, "stream_started", { - # "agent_id": request.agent_id, - # "invocation_id": invocation_id, - # "middleware_invocation_id": middleware_invocation_id, # NEW: Include middleware ID - # "input_args": request.input_data.input_args, - # "input_kwargs": list(request.input_data.input_kwargs.keys()) - # }) - - # console.print(f"Starting tracked stream for agent: [cyan]{request.agent_id}[/cyan] (invocation: {invocation_id}...)") - # if middleware_invocation_id: - # console.print(f"[dim]Middleware invocation: {middleware_invocation_id}...[/dim]") - - # console.print(f"Input args: [cyan]{request.input_data.input_args}[/cyan]") - # console.print(f"Input kwargs: [cyan]{request.input_data.input_kwargs}[/cyan]") - - # # Track active stream - # self.active_streams[connection_id] = { - # "agent_id": request.agent_id, - # "invocation_id": invocation_id, - # "middleware_invocation_id": middleware_invocation_id, # NEW: Track middleware ID - # "start_time": start_time, - # "chunk_count": 0 - # } - - # # Start the streaming iteration - # async for chunk in self._safe_agent_stream( - # agent_execution_streamer, - # *request.input_data.input_args, - # **request.input_data.input_kwargs - # ): - # # Check if stream is still active - # if connection_id not in self.active_streams: - # break - - # chunk_count += 1 - # self.active_streams[connection_id]["chunk_count"] = chunk_count - - # # Convert chunk to serializable format before storing - # try: - # serializable_chunk = self._convert_to_serializable(chunk) - - # # Store chunk for final output tracking (limit to prevent memory issues) - # if chunk_count <= 100: - # stream_output_data.append(serializable_chunk) - - # except Exception as e: - # console.print(f"⚠️ [yellow]Warning: Could not serialize chunk {chunk_count}: {e}[/yellow]") - # # Store a safe representation - # stream_output_data.append({ - # "chunk_number": chunk_count, - # "serialization_error": str(e), - # "chunk_type": str(type(chunk)), - # "chunk_preview": str(chunk)[:100] + "..." if len(str(chunk)) > 100 else str(chunk) - # }) - - # raw_data_msg = SafeMessage( - # id="raw_chunk", - # type=MessageType.RAW_DATA, - # timestamp="", - # data=chunk - # ) - # # Send chunk with appropriate message type - # serialized_chunk = self.serializer.serialize_message(raw_data_msg) - # await websocket.send_text(serialized_chunk) - - # # Small delay to prevent overwhelming the client - # await asyncio.sleep(0) - - # # Calculate execution time - # execution_time = time.time() - start_time - - # # Send completion status - # await self._send_status(websocket, "stream_completed", { - # "agent_id": request.agent_id, - # "invocation_id": invocation_id, - # "middleware_invocation_id": middleware_invocation_id, # NEW: Include middleware ID - # "total_chunks": chunk_count, - # "execution_time": execution_time - # }) - - # # Complete local invocation tracking with success - # try: - # db_service.complete_invocation( - # invocation_id=invocation_id, - # output_data={ - # "stream_completed": True, - # "total_chunks": chunk_count, - # "sample_chunks": stream_output_data[:10] if stream_output_data else [], - # "execution_time_seconds": execution_time, - # "chunk_summary": { - # "total_chunks": chunk_count, - # "stored_samples": min(len(stream_output_data), 10), - # "stream_type": "websocket" - # } - # }, - # execution_time_ms=execution_time * 1000 - # ) - # console.print(f"✅ [green]Local invocation tracking completed successfully[/green]") - # except Exception as e: - # console.print(f"❌ [red]Failed to complete local invocation tracking: {str(e)}[/red]") - # # Try to complete with minimal data - # try: - # db_service.complete_invocation( - # invocation_id=invocation_id, - # output_data={ - # "stream_completed": True, - # "total_chunks": chunk_count, - # "execution_time_seconds": execution_time, - # "serialization_note": "Some chunks could not be serialized" - # }, - # execution_time_ms=execution_time * 1000 - # ) - # console.print(f"✅ [green]Local invocation tracking completed with minimal data[/green]") - # except Exception as e2: - # console.print(f"❌ [red]Critical: Could not complete local invocation tracking: {str(e2)}[/red]") - - # # NEW: Sync completion to middleware - # if middleware_invocation_id and self.middleware_sync: - # try: - # await self.middleware_sync.sync_invocation_complete( - # middleware_invocation_id, - # { - # "output_data": { - # "stream_completed": True, - # "total_chunks": chunk_count, - # "sample_chunks": stream_output_data[:10] if stream_output_data else [], - # "execution_time_seconds": execution_time, - # "chunk_summary": { - # "total_chunks": chunk_count, - # "stored_samples": min(len(stream_output_data), 10), - # "stream_type": "websocket" - # } - # }, - # "execution_time_ms": execution_time * 1000 - # } - # ) - # console.print(f"📡 [dim]Stream completion synced to middleware[/dim]") - # except Exception as e: - # console.print(f"⚠️ [yellow]Failed to sync stream completion to middleware: {e}[/yellow]") - - # # Record in original agent_runs table for backward compatibility - # self.db_service.record_agent_run( - # agent_id=request.agent_id, - # input_data=request.input_data.dict(), - # output_data={"stream_completed": True, "chunk_count": chunk_count}, - # success=True, - # execution_time=execution_time, - # ) - - # console.print( - # f"✅ Agent [cyan]{request.agent_id}[/cyan] tracked stream completed successfully in " - # f"{execution_time:.2f}s with {chunk_count} chunks (invocation: {invocation_id[:8]}...)" - # ) - - # except Exception as e: - # execution_time = time.time() - start_time - # error_detail = f"Error streaming agent {request.agent_id}: {str(e)}" - - # await self._send_error(websocket, error_detail) - - # # Complete local invocation tracking with error - # db_service.complete_invocation( - # invocation_id=invocation_id, - # error_detail=error_detail, - # execution_time_ms=execution_time * 1000 - # ) - - # # NEW: Sync error to middleware - # if middleware_invocation_id and self.middleware_sync: - # try: - # await self.middleware_sync.sync_invocation_complete( - # middleware_invocation_id, - # { - # "error_detail": error_detail, - # "execution_time_ms": execution_time * 1000 - # } - # ) - # except Exception as sync_error: - # console.print(f"⚠️ [yellow]Failed to sync stream error to middleware: {sync_error}[/yellow]") - - # # Record failed run in database (backward compatibility) - # self.db_service.record_agent_run( - # agent_id=request.agent_id, - # input_data=request.input_data.dict(), - # output_data=None, - # success=False, - # error_message=error_detail, - # execution_time=execution_time, - # ) - - # console.print(f"💥 [red]{error_detail}[/red] (invocation: {invocation_id[:8]}...)") - - # finally: - # self._cleanup_stream(connection_id) - - def _convert_to_serializable(self, obj): - """Convert objects to JSON-serializable format""" - try: - # Try direct JSON serialization first - import json - json.dumps(obj) - return obj - except (TypeError, ValueError): - # Handle common non-serializable objects - if hasattr(obj, '__dict__'): - # Objects with __dict__ (like TextEvent) - return { - "type": type(obj).__name__, - "data": {k: self._convert_to_serializable(v) for k, v in obj.__dict__.items()} - } - elif hasattr(obj, '_asdict'): - # Named tuples - return { - "type": type(obj).__name__, - "data": obj._asdict() - } - elif hasattr(obj, 'dict'): - # Pydantic models - try: - return obj.dict() - except: - return {"type": type(obj).__name__, "repr": repr(obj)} - elif hasattr(obj, 'model_dump'): - # Pydantic v2 models - try: - return obj.model_dump() - except: - return {"type": type(obj).__name__, "repr": repr(obj)} - elif isinstance(obj, (list, tuple)): - # Handle lists/tuples recursively - return [self._convert_to_serializable(item) for item in obj] - elif isinstance(obj, dict): - # Handle dictionaries recursively - return {k: self._convert_to_serializable(v) for k, v in obj.items()} - else: - # Fallback to string representation - return { - "type": type(obj).__name__, - "repr": repr(obj)[:500], # Limit length - "str": str(obj)[:500] # Limit length - } - async def _safe_agent_stream(self, agent_execution_streamer, *input_args, **input_kwargs): """UNCHANGED - Safely wrap the agent's generic_stream method""" diff --git a/templates/agno/default/requirements.txt b/templates/agno/default/requirements.txt index e86126c..85c40e8 100644 --- a/templates/agno/default/requirements.txt +++ b/templates/agno/default/requirements.txt @@ -1 +1,2 @@ -agno>=1.7.2 \ No newline at end of file +agno>=1.7.2 +openai \ No newline at end of file diff --git a/templates/agno/default/runagent.config.json b/templates/agno/default/runagent.config.json index 1c0c1f6..bfe717e 100644 --- a/templates/agno/default/runagent.config.json +++ b/templates/agno/default/runagent.config.json @@ -12,19 +12,6 @@ }, "agent_architecture": { "entrypoints": [ - { - "file": "simple_assistant.py", - "module": "agent.run", - "tag": "agno_assistant" - }, - { - "file": "simple_assistant.py", - "module": "agent_run_stream", - "tag": "agno_stream", - "extractor": { - "content": "$.content" - } - }, { "file": "simple_assistant.py", "module": "agent_print_response", @@ -37,4 +24,5 @@ } ] } -} \ No newline at end of file +} + diff --git a/templates/llamaindex/default/requirements.txt b/templates/llamaindex/default/requirements.txt index 1604f36..3191088 100644 --- a/templates/llamaindex/default/requirements.txt +++ b/templates/llamaindex/default/requirements.txt @@ -1 +1 @@ -llama-index>=0.12.48 +llama-index diff --git a/templates/llamaindex/default/runagent.config.json b/templates/llamaindex/default/runagent.config.json index 2af2a9f..f77be5b 100644 --- a/templates/llamaindex/default/runagent.config.json +++ b/templates/llamaindex/default/runagent.config.json @@ -23,8 +23,5 @@ "tag": "math_stream" } ] - }, - "env_vars": { - "OPENAI_API_KEY": "sk-proj-1234567890" } } \ No newline at end of file diff --git a/test_scripts/python/client_test_agno.py b/test_scripts/python/client_test_agno.py index 4939a57..3a409af 100644 --- a/test_scripts/python/client_test_agno.py +++ b/test_scripts/python/client_test_agno.py @@ -1,8 +1,8 @@ # from runagent import RunAgentClient # ra = RunAgentClient( -# agent_id="c7a08c39-9086-436b-b64e-399779f5a7e8", -# entrypoint_tag="agno_assistant", +# agent_id="27f68f00-e8cd-4965-9b91-fac501e132e3", +# entrypoint_tag="agno_print_response", # local=True # ) @@ -17,12 +17,12 @@ from runagent import RunAgentClient ra = RunAgentClient( - agent_id="c7a08c39-9086-436b-b64e-399779f5a7e8", - entrypoint_tag="agno_stream", + agent_id="27f68f00-e8cd-4965-9b91-fac501e132e3", + entrypoint_tag="agno_print_response_stream", local=True ) for chunk in ra.run( "Benefits of a long drive" ): - print(chunk['content'], end="") + print(chunk) diff --git a/test_scripts/python/client_test_llamaindex.py b/test_scripts/python/client_test_llamaindex.py index d9d77e1..55393ba 100644 --- a/test_scripts/python/client_test_llamaindex.py +++ b/test_scripts/python/client_test_llamaindex.py @@ -1,7 +1,7 @@ # from runagent import RunAgentClient # ra = RunAgentClient( -# agent_id="408db172-a58b-41f3-b396-0e182784749d", +# agent_id="583d497b-cb6b-4558-b489-cba8f66e57a1", # entrypoint_tag="math_run", # local=True # ) @@ -17,12 +17,13 @@ from runagent import RunAgentClient ra = RunAgentClient( - agent_id="c843839d-4597-405f-9c5a-c532364edce4", - entrypoint_tag="main_stream", + agent_id="583d497b-cb6b-4558-b489-cba8f66e57a1", + entrypoint_tag="math_stream", local=True ) + for chunk in ra.run( "What is 2 * 3?" ): - print(chunk) + print(chunk) \ No newline at end of file