Rad/updated local sync#75
Conversation
WalkthroughIntroduces client-side routing between REST and WebSocket based on entrypoint tag, overhauls WebSocket streaming to serialize chunks safely and integrate middleware synchronization, expands server-side middleware sync for start/complete/error paths, updates constants to a new base URL, adjusts templates (dependencies/config), and refreshes test scripts to new agent IDs/tags. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant Client as RunAgentClient
participant REST as REST API
participant WS as WebSocket Server
User->>Client: run(payload, entrypoint_tag)
alt entrypoint_tag endsWith "_stream"
Note over Client: is_streaming = true
Client->>WS: run_stream(payload) [open WS, send]
WS-->>Client: stream chunks (serialized)
Client-->>User: yields chunks
else
Note over Client: is_streaming = false
Client->>REST: POST /run
REST-->>Client: JSON response
Client-->>User: returns result
end
sequenceDiagram
autonumber
participant Client as WS Client
participant Server as socket_utils handler
participant MW as Middleware Sync
Client->>Server: connect + start stream
rect rgba(230,245,255,0.6)
Server->>MW: invocation_start(payload)
MW-->>Server: {middleware_invocation_id}
Note right of Server: Track IDs, emit stream_started status
end
loop For each chunk
Server->>Server: _convert_chunk_to_serializable(chunk)
Server-->>Client: send serialized chunk
Server->>Server: store (bounded list)
end
alt success
Server->>MW: invocation_complete(execution_data)
MW-->>Server: ack
Server-->>Client: stream_completed status
else error
Server->>MW: invocation_error(error_data)
MW-->>Server: ack/fail
Server-->>Client: error payload
end
Client-->>Server: close
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
runagent/client/client.py(4 hunks)runagent/constants.py(1 hunks)runagent/sdk/deployment/middleware_sync.py(2 hunks)runagent/sdk/server/local_server.py(10 hunks)runagent/sdk/server/socket_utils.py(17 hunks)templates/agno/default/requirements.txt(1 hunks)templates/agno/default/runagent.config.json(1 hunks)templates/llamaindex/default/requirements.txt(1 hunks)templates/llamaindex/default/runagent.config.json(0 hunks)test_scripts/python/client_test_agno.py(2 hunks)test_scripts/python/client_test_llamaindex.py(2 hunks)
💤 Files with no reviewable changes (1)
- templates/llamaindex/default/runagent.config.json
🧰 Additional context used
🧬 Code graph analysis (6)
runagent/sdk/deployment/middleware_sync.py (2)
runagent/sdk/config.py (1)
base_url(295-297)runagent/sdk/rest_client.py (3)
post(209-211)get(205-207)put(213-215)
test_scripts/python/client_test_llamaindex.py (2)
runagent-rust/runagent/src/client/runagent_client.rs (1)
entrypoint_tag(279-281)runagent/client/client.py (1)
run(46-78)
runagent/client/client.py (2)
runagent/sdk/socket_client.py (1)
run_stream(72-111)runagent/cli/commands.py (1)
run_stream(1084-1219)
test_scripts/python/client_test_agno.py (2)
runagent-rust/runagent/src/client/runagent_client.rs (1)
entrypoint_tag(279-281)runagent/client/client.py (1)
run(46-78)
runagent/sdk/server/local_server.py (1)
runagent/sdk/deployment/middleware_sync.py (2)
is_sync_enabled(52-54)sync_invocation_complete(145-182)
runagent/sdk/server/socket_utils.py (3)
runagent/utils/serializer.py (2)
CoreSerializer(8-218)deserialize_message(118-154)runagent/sdk/deployment/middleware_sync.py (3)
get_middleware_sync(269-281)is_sync_enabled(52-54)sync_invocation_start(108-142)runagent/utils/schema.py (2)
WebSocketAgentRequest(109-119)WebSocketActionType(103-106)
🪛 Ruff (0.13.3)
runagent/sdk/deployment/middleware_sync.py
78-78: f-string without any placeholders
Remove extraneous f prefix
(F541)
97-97: Do not catch blind exception: Exception
(BLE001)
98-98: Use explicit conversion flag
Replace with conversion flag
(RUF010)
101-101: Do not catch blind exception: Exception
(BLE001)
102-102: Use explicit conversion flag
Replace with conversion flag
(RUF010)
114-114: f-string without any placeholders
Remove extraneous f prefix
(F541)
125-125: f-string without any placeholders
Remove extraneous f prefix
(F541)
138-138: Consider moving this statement to an else block
(TRY300)
140-140: Do not catch blind exception: Exception
(BLE001)
141-141: Use explicit conversion flag
Replace with conversion flag
(RUF010)
175-175: f-string without any placeholders
Remove extraneous f prefix
(F541)
178-178: Consider moving this statement to an else block
(TRY300)
180-180: Do not catch blind exception: Exception
(BLE001)
181-181: Use explicit conversion flag
Replace with conversion flag
(RUF010)
runagent/client/client.py
52-52: f-string without any placeholders
Remove extraneous f prefix
(F541)
runagent/sdk/server/local_server.py
796-796: f-string without any placeholders
Remove extraneous f prefix
(F541)
821-821: f-string without any placeholders
Remove extraneous f prefix
(F541)
823-823: Do not catch blind exception: Exception
(BLE001)
876-876: f-string without any placeholders
Remove extraneous f prefix
(F541)
887-887: f-string without any placeholders
Remove extraneous f prefix
(F541)
889-889: f-string without any placeholders
Remove extraneous f prefix
(F541)
891-891: Do not catch blind exception: Exception
(BLE001)
896-896: f-string without any placeholders
Remove extraneous f prefix
(F541)
993-993: f-string without any placeholders
Remove extraneous f prefix
(F541)
1004-1004: f-string without any placeholders
Remove extraneous f prefix
(F541)
1006-1006: f-string without any placeholders
Remove extraneous f prefix
(F541)
1008-1008: Do not catch blind exception: Exception
(BLE001)
runagent/sdk/server/socket_utils.py
46-46: Do not use bare except
(E722)
46-47: try-except-pass detected, consider logging the exception
(S110)
53-53: Do not use bare except
(E722)
53-54: try-except-pass detected, consider logging the exception
(S110)
64-64: Do not use bare except
(E722)
64-65: try-except-pass detected, consider logging the exception
(S110)
73-73: Do not use bare except
(E722)
73-74: try-except-pass detected, consider logging the exception
(S110)
163-163: f-string without any placeholders
Remove extraneous f prefix
(F541)
187-187: Do not catch blind exception: Exception
(BLE001)
210-210: Do not catch blind exception: Exception
(BLE001)
223-223: Do not catch blind exception: Exception
(BLE001)
232-232: Do not catch blind exception: Exception
(BLE001)
237-237: Use explicit conversion flag
Replace with conversion flag
(RUF010)
266-266: f-string without any placeholders
Remove extraneous f prefix
(F541)
268-268: Do not catch blind exception: Exception
(BLE001)
274-274: f-string without any placeholders
Remove extraneous f prefix
(F541)
288-288: f-string without any placeholders
Remove extraneous f prefix
(F541)
290-290: f-string without any placeholders
Remove extraneous f prefix
(F541)
292-292: Do not catch blind exception: Exception
(BLE001)
322-322: f-string without any placeholders
Remove extraneous f prefix
(F541)
334-334: f-string without any placeholders
Remove extraneous f prefix
(F541)
336-336: Do not catch blind exception: Exception
(BLE001)
407-407: Do not catch blind exception: Exception
(BLE001)
408-408: Use explicit conversion flag
Replace with conversion flag
(RUF010)
421-421: Do not catch blind exception: Exception
(BLE001)
422-422: Use explicit conversion flag
Replace with conversion flag
(RUF010)
423-423: Use explicit conversion flag
Replace with conversion flag
(RUF010)
| try: | ||
| response = self.rest_client.http.post("/local-agents", data=sync_data, timeout=30) | ||
| result = response.json() if hasattr(response, 'json') else response | ||
|
|
||
| console.print(f"[cyan]Response status: {response.status_code if hasattr(response, 'status_code') else 'N/A'}[/cyan]") | ||
| console.print(f"[cyan]Response: {result}[/cyan]") | ||
|
|
||
| if result.get("success"): | ||
| console.print("[green]✅ Agent synced successfully[/green]") | ||
| return True | ||
| else: | ||
| error_msg = result.get("error", "Unknown error") | ||
| console.print(f"[yellow]⚠️ Agent sync failed: {error_msg}[/yellow]") | ||
| return False | ||
|
|
||
| except Exception as e: | ||
| console.print(f"[red]❌ HTTP request failed: {str(e)}[/red]") | ||
| return False |
There was a problem hiding this comment.
Prevent blocking the event loop in async middleware sync
sync_agent_startup, sync_invocation_start, and sync_invocation_complete now call self.rest_client.http.post/put directly inside async def methods. Those helpers are synchronous (requests-based), so this change runs blocking I/O on the event loop thread. Under load the local FastAPI server will stall while waiting for each middleware call, undoing the previous asyncio.to_thread safeguard. Please offload these calls to a thread (or reuse _make_async_request) so the event loop stays responsive.
- try:
- response = self.rest_client.http.post("/local-agents", data=sync_data, timeout=30)
+ try:
+ response = await asyncio.to_thread(
+ self.rest_client.http.post,
+ "/local-agents",
+ data=sync_data,
+ timeout=30,
+ )
@@
- response = self.rest_client.http.post("/local-agents/invocations", data=sync_payload, timeout=30)
+ response = await asyncio.to_thread(
+ self.rest_client.http.post,
+ "/local-agents/invocations",
+ data=sync_payload,
+ timeout=30,
+ )
@@
- response = self.rest_client.http.put(f"/local-agents/invocations/{execution_id}", data=update_payload, timeout=30)
+ response = await asyncio.to_thread(
+ self.rest_client.http.put,
+ f"/local-agents/invocations/{execution_id}",
+ data=update_payload,
+ timeout=30,
+ )Also applies to: 125-138, 169-177
🧰 Tools
🪛 Ruff (0.13.3)
97-97: Do not catch blind exception: Exception
(BLE001)
98-98: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🤖 Prompt for AI Agents
In runagent/sdk/deployment/middleware_sync.py around lines 82-99 (and similarly
for 125-138 and 169-177), the code calls synchronous HTTP helpers
(self.rest_client.http.post/put) directly inside async methods which blocks the
event loop; change those direct calls to run in a thread by either awaiting the
existing helper that wraps sync calls (e.g. reuse self._make_async_request) or
by doing: response = await asyncio.to_thread(self.rest_client.http.post,
"/local-agents", sync_data, timeout=30) (and likewise for put), then process
response.json() and status_code as before; add "import asyncio" at top if using
asyncio.to_thread and ensure exception handling and return booleans remain
identical.
| if hasattr(chunk, 'dict') and callable(chunk.dict): | ||
| try: | ||
| ws_request = WebSocketAgentRequest(**request_msg.data) | ||
| except Exception as e: | ||
| await self._send_error(websocket, f"Invalid request format: {str(e)}") | ||
| return | ||
|
|
||
| if ws_request.action == WebSocketActionType.START_STREAM: | ||
| await self._handle_stream_start(websocket, ws_request, connection_id, agent_execution_streamer) | ||
| elif ws_request.action == WebSocketActionType.PING: | ||
| await self._send_pong(websocket) | ||
| else: | ||
| await self._send_error(websocket, f"Unknown action: {ws_request.action}") | ||
|
|
||
| except WebSocketDisconnect: | ||
| console.print(f"WebSocket disconnected for agent: [cyan]{agent_id}[/cyan]") | ||
| self._cleanup_stream(connection_id) | ||
| except Exception as e: | ||
| console.print(f"💥 WebSocket error for agent {agent_id}: [red]{str(e)}[/red]") | ||
| await self._send_error(websocket, f"Server error: {str(e)}") | ||
| self._cleanup_stream(connection_id) | ||
| return chunk.dict() | ||
| except: | ||
| pass | ||
|
|
||
| # Objects with model_dump() method (Pydantic v2) | ||
| if hasattr(chunk, 'model_dump') and callable(chunk.model_dump): | ||
| try: | ||
| return chunk.model_dump() | ||
| except: | ||
| pass |
There was a problem hiding this comment.
Fix Pydantic chunk conversion to stay JSON-serializable
chunk.dict() / chunk.model_dump() frequently leave datetime and other Python objects inside the payload, so the later json.dumps(...) call throws TypeError and streaming falls into the error branch. Re-run the conversion through _convert_chunk_to_serializable (or use the JSON mode) before returning so every chunk is safely serializable.
Apply this diff:
- return chunk.dict()
+ return self._convert_chunk_to_serializable(chunk.dict())
@@
- return chunk.model_dump()
+ return self._convert_chunk_to_serializable(chunk.model_dump())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if hasattr(chunk, 'dict') and callable(chunk.dict): | |
| try: | |
| ws_request = WebSocketAgentRequest(**request_msg.data) | |
| except Exception as e: | |
| await self._send_error(websocket, f"Invalid request format: {str(e)}") | |
| return | |
| if ws_request.action == WebSocketActionType.START_STREAM: | |
| await self._handle_stream_start(websocket, ws_request, connection_id, agent_execution_streamer) | |
| elif ws_request.action == WebSocketActionType.PING: | |
| await self._send_pong(websocket) | |
| else: | |
| await self._send_error(websocket, f"Unknown action: {ws_request.action}") | |
| except WebSocketDisconnect: | |
| console.print(f"WebSocket disconnected for agent: [cyan]{agent_id}[/cyan]") | |
| self._cleanup_stream(connection_id) | |
| except Exception as e: | |
| console.print(f"💥 WebSocket error for agent {agent_id}: [red]{str(e)}[/red]") | |
| await self._send_error(websocket, f"Server error: {str(e)}") | |
| self._cleanup_stream(connection_id) | |
| return chunk.dict() | |
| except: | |
| pass | |
| # Objects with model_dump() method (Pydantic v2) | |
| if hasattr(chunk, 'model_dump') and callable(chunk.model_dump): | |
| try: | |
| return chunk.model_dump() | |
| except: | |
| pass | |
| if hasattr(chunk, 'dict') and callable(chunk.dict): | |
| try: | |
| return self._convert_chunk_to_serializable(chunk.dict()) | |
| except: | |
| pass | |
| # Objects with model_dump() method (Pydantic v2) | |
| if hasattr(chunk, 'model_dump') and callable(chunk.model_dump): | |
| try: | |
| return self._convert_chunk_to_serializable(chunk.model_dump()) | |
| except: | |
| pass |
🧰 Tools
🪛 Ruff (0.13.3)
46-46: Do not use bare except
(E722)
46-47: try-except-pass detected, consider logging the exception
(S110)
53-53: Do not use bare except
(E722)
53-54: try-except-pass detected, consider logging the exception
(S110)
🤖 Prompt for AI Agents
In runagent/sdk/server/socket_utils.py around lines 43 to 54, the code returns
raw results of chunk.dict() or chunk.model_dump() which can still contain
non-JSON-serializable objects (e.g., datetimes); after successfully calling
chunk.dict() or chunk.model_dump(), pass the resulting dict through the existing
_convert_chunk_to_serializable(...) helper (or call model_dump(mode="json") if
preferred) and return that converted value so all chunks are JSON-serializable;
keep the try/except structure and only return the converted payload on success.
Fixed sync with middleware as well as cloud deployment
Summary by CodeRabbit