Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions runagent/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ def __init__(self, agent_id: str, entrypoint_tag: str, local: bool = True, host:
self.local = local
self.agent_id = agent_id
self.entrypoint_tag = entrypoint_tag

# FIXED: Detect if this is a streaming entrypoint
self.is_streaming = entrypoint_tag.endswith("_stream")

if local:
if host and port:
Expand All @@ -40,19 +43,16 @@ def __init__(self, agent_id: str, entrypoint_tag: str, local: bool = True, host:
self.rest_client = RestClient(is_local=False)
self.socket_client = SocketClient(is_local=False)

# self.agent_architecture = self.rest_client.get_agent_architecture(agent_id)

# selected_entrypoint = next(
# (
# entrypoint for entrypoint in self.agent_architecture['entrypoints']
# if entrypoint['tag'] == entrypoint_tag
# ), None)

# if not selected_entrypoint:
# raise ValueError(f"Entrypoint `{entrypoint_tag}` not found in agent {agent_id}")

def run(self, *input_args, **input_kwargs):

"""
FIXED: Smart execution - automatically uses streaming or non-streaming based on entrypoint
"""
# FIXED: If this is a streaming entrypoint, automatically use run_stream
if self.is_streaming:
console.print(f"[cyan]Detected streaming entrypoint, using WebSocket streaming[/cyan]")
return self.run_stream(*input_args, **input_kwargs)

# Non-streaming execution (HTTP POST)
response = self.rest_client.run_agent(
self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs
)
Expand Down Expand Up @@ -80,6 +80,7 @@ def run(self, *input_args, **input_kwargs):
def run_stream(self, *input_args, **input_kwargs):
"""Stream agent execution results in real-time via WebSocket"""
try:
# FIXED: Return the generator directly, don't try to iterate here
return self.socket_client.run_stream(
self.agent_id, self.entrypoint_tag, input_args=input_args, input_kwargs=input_kwargs
)
Expand All @@ -89,4 +90,4 @@ def run_stream(self, *input_args, **input_kwargs):

def _run_stream(self, *input_args, **input_kwargs):
"""Legacy method - use run_stream instead"""
return self.run_stream(*input_args, **input_kwargs)
return self.run_stream(*input_args, **input_kwargs)
4 changes: 2 additions & 2 deletions runagent/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
# Environment Variables
ENV_RUNAGENT_API_KEY = "RUNAGENT_API_KEY"
# UPDATED: Change default port to match your middleware (8333)
ENV_RUNAGENT_BASE_URL = "http://52.237.88.147:8333/"
ENV_RUNAGENT_BASE_URL = "http://20.84.81.110:8333/"
ENV_LOCAL_CACHE_DIRECTORY = "RUNAGENT_CACHE_DIR"
ENV_RUNAGENT_LOGGING_LEVEL = "RUNAGENT_LOGGING_LEVEL"

# Local Configuration
LOCAL_CACHE_DIRECTORY_PATH = "~/.runagent"
USER_DATA_FILE_NAME = "user_data.json"
DEFAULT_BASE_URL = "http://52.237.88.147:8333/"
DEFAULT_BASE_URL = "http://20.84.81.110:8333/"
AGENT_CONFIG_FILE_NAME = "runagent.config.json"

# Rest of the file remains the same...
Expand Down
112 changes: 77 additions & 35 deletions runagent/sdk/deployment/middleware_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,16 @@ def is_sync_enabled(self) -> bool:
return getattr(self, 'sync_enabled', False)

async def sync_agent_startup(self, agent_id: str, agent_data: Dict[str, Any]) -> bool:
"""Sync agent data when local server starts - FIXED for simplified ID structure"""
"""Sync agent data when local server starts"""
if not self.is_sync_enabled():
console.print("[dim]Middleware sync disabled - agent will run in local-only mode[/dim]")
return False

try:
console.print(f"Syncing agent {agent_id} to middleware...")
console.print(f"[cyan]Syncing agent {agent_id[:8]}... to middleware...[/cyan]")

# FIXED: Use simplified structure - agent_id becomes the main ID
sync_data = {
"local_agent_id": agent_id, # This becomes the main agent ID in middleware
"local_agent_id": agent_id,
"name": agent_data.get("name", "Local Agent"),
"framework": agent_data.get("framework", "unknown"),
"version": agent_data.get("version", "1.0.0"),
Expand All @@ -76,67 +75,110 @@ async def sync_agent_startup(self, agent_id: str, agent_data: Dict[str, Any]) ->
"sync_source": "local_server"
}

response = await self._make_async_request("POST", "/local-agents", sync_data)
console.print(f"[dim]Sending POST to /local-agents[/dim]")
console.print(f"[dim]Full URL: {self.rest_client.base_url}/local-agents[/dim]")

if response.get("success"):
console.print("Agent synced successfully to middleware")
return True
else:
console.print(f"Agent sync failed: {response.get('error', 'Unknown error')}")
# Make direct HTTP call instead of using _make_async_request
try:
response = self.rest_client.http.post("/local-agents", data=sync_data, timeout=30)
result = response.json() if hasattr(response, 'json') else response

console.print(f"[cyan]Response status: {response.status_code if hasattr(response, 'status_code') else 'N/A'}[/cyan]")
console.print(f"[cyan]Response: {result}[/cyan]")

if result.get("success"):
console.print("[green]✅ Agent synced successfully[/green]")
return True
else:
error_msg = result.get("error", "Unknown error")
console.print(f"[yellow]⚠️ Agent sync failed: {error_msg}[/yellow]")
return False

except Exception as e:
console.print(f"[red]❌ HTTP request failed: {str(e)}[/red]")
return False
Comment on lines +82 to 99
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Prevent blocking the event loop in async middleware sync
sync_agent_startup, sync_invocation_start, and sync_invocation_complete now call self.rest_client.http.post/put directly inside async def methods. Those helpers are synchronous (requests-based), so this change runs blocking I/O on the event loop thread. Under load the local FastAPI server will stall while waiting for each middleware call, undoing the previous asyncio.to_thread safeguard. Please offload these calls to a thread (or reuse _make_async_request) so the event loop stays responsive.

-            try:
-                response = self.rest_client.http.post("/local-agents", data=sync_data, timeout=30)
+            try:
+                response = await asyncio.to_thread(
+                    self.rest_client.http.post,
+                    "/local-agents",
+                    data=sync_data,
+                    timeout=30,
+                )
@@
-            response = self.rest_client.http.post("/local-agents/invocations", data=sync_payload, timeout=30)
+            response = await asyncio.to_thread(
+                self.rest_client.http.post,
+                "/local-agents/invocations",
+                data=sync_payload,
+                timeout=30,
+            )
@@
-            response = self.rest_client.http.put(f"/local-agents/invocations/{execution_id}", data=update_payload, timeout=30)
+            response = await asyncio.to_thread(
+                self.rest_client.http.put,
+                f"/local-agents/invocations/{execution_id}",
+                data=update_payload,
+                timeout=30,
+            )

Also applies to: 125-138, 169-177

🧰 Tools
🪛 Ruff (0.13.3)

97-97: Do not catch blind exception: Exception

(BLE001)


98-98: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
In runagent/sdk/deployment/middleware_sync.py around lines 82-99 (and similarly
for 125-138 and 169-177), the code calls synchronous HTTP helpers
(self.rest_client.http.post/put) directly inside async methods which blocks the
event loop; change those direct calls to run in a thread by either awaiting the
existing helper that wraps sync calls (e.g. reuse self._make_async_request) or
by doing: response = await asyncio.to_thread(self.rest_client.http.post,
"/local-agents", sync_data, timeout=30) (and likewise for put), then process
response.json() and status_code as before; add "import asyncio" at top if using
asyncio.to_thread and ensure exception handling and return booleans remain
identical.


except Exception as e:
console.print(f"Agent sync error: {str(e)}")
console.print(f"[red]❌ Agent sync error: {str(e)}[/red]")
import traceback
console.print(f"[dim]{traceback.format_exc()}[/dim]")
return False


async def sync_invocation_start(self, invocation_data: Dict[str, Any]) -> Optional[str]:
"""Sync invocation start to middleware - FIXED for simplified ID structure"""
"""Sync invocation start to middleware"""
if not self.is_sync_enabled():
return None

try:
# FIXED: Use simplified structure
console.print(f"[cyan]📡 Syncing invocation start...[/cyan]")

sync_payload = {
"agent_id": invocation_data.get("agent_id"), # Main agent ID (no separate local_agent_id)
"local_execution_id": invocation_data.get("local_execution_id"), # This becomes main execution ID
"agent_id": invocation_data.get("agent_id"),
"local_execution_id": invocation_data.get("local_execution_id"),
"input_data": invocation_data.get("input_data", {}),
"entrypoint_tag": invocation_data.get("entrypoint_tag", ""),
"sdk_type": invocation_data.get("sdk_type", "local_server"),
"client_info": invocation_data.get("client_info", {})
}

response = await self._make_async_request(
"POST",
"/local-agents/invocations",
sync_payload
)
console.print(f"[dim]POST to /local-agents/invocations[/dim]")

response = self.rest_client.http.post("/local-agents/invocations", data=sync_payload, timeout=30)
result = response.json() if hasattr(response, 'json') else response

if response.get("success"):
# Return the execution ID (which is now the main ID)
return response.get("id")
console.print(f"[cyan]Response: {result}[/cyan]")

if result.get("success"):
execution_id = result.get("data", {}).get("id")
if execution_id:
console.print(f"[green]✅ Invocation synced: {execution_id[:8]}...[/green]")
return execution_id

return None

except Exception as e:
console.print(f"Invocation start sync error: {str(e)}")

return None
console.print(f"[red]❌ Invocation sync error: {str(e)}[/red]")
return None


async def sync_invocation_complete(self, execution_id: str, completion_data: Dict[str, Any]) -> bool:
"""Sync invocation completion to middleware - FIXED for simplified ID structure"""
"""Sync invocation completion to middleware"""
if not self.is_sync_enabled() or not execution_id:
return False

try:
# FIXED: Use main execution ID directly (no separate local_execution_id)
response = await self._make_async_request(
"PUT",
f"/local-agents/invocations/{execution_id}", # execution_id is now the main ID
completion_data
)
console.print(f"[cyan]📡 Syncing completion: {execution_id[:8]}...[/cyan]")

update_payload = {}

if completion_data.get("output_data"):
update_payload["output_data"] = completion_data["output_data"]

if completion_data.get("error_detail"):
update_payload["error_detail"] = completion_data["error_detail"]

if completion_data.get("execution_time_ms"):
update_payload["execution_time_ms"] = completion_data["execution_time_ms"]

if completion_data.get("status"):
update_payload["status"] = completion_data["status"]

return response.get("success", False)
console.print(f"[dim]PUT to /local-agents/invocations/{execution_id[:8]}...[/dim]")

response = self.rest_client.http.put(f"/local-agents/invocations/{execution_id}", data=update_payload, timeout=30)
result = response.json() if hasattr(response, 'json') else response

console.print(f"[cyan]Response: {result}[/cyan]")

if result.get("success"):
console.print(f"[green]✅ Completion synced[/green]")
return True

return False

except Exception as e:
console.print(f"Invocation completion sync error: {str(e)}")
console.print(f"[red]❌ Completion sync error: {str(e)}[/red]")
return False

def get_sync_status(self) -> Dict[str, Any]:
Expand Down
Loading