-
Notifications
You must be signed in to change notification settings - Fork 3
all agents in orchestral flow publishes to queue #1157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: testing
Are you sure you want to change the base?
Conversation
|
/windsurf-review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 To request another review, post a new comment with "/windsurf-review".
| # Prepare queue data for this intermediate agent | ||
| queue_data = await make_request_data_and_publish_sub_queue(parsed_data, result, params, thread_info) | ||
| queue_data = make_json_serializable(queue_data) | ||
| current_history_data['queue_data'] = queue_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding error handling around the queue data serialization to prevent potential runtime errors if the data contains non-serializable objects:
| # Prepare queue data for this intermediate agent | |
| queue_data = await make_request_data_and_publish_sub_queue(parsed_data, result, params, thread_info) | |
| queue_data = make_json_serializable(queue_data) | |
| current_history_data['queue_data'] = queue_data | |
| # Prepare queue data for this intermediate agent | |
| try: | |
| queue_data = await make_request_data_and_publish_sub_queue(parsed_data, result, params, thread_info) | |
| queue_data = make_json_serializable(queue_data) | |
| current_history_data['queue_data'] = queue_data | |
| except Exception as e: | |
| logger.error(f"Error serializing queue data: {str(e)}") | |
| # Still continue with the flow even if queue data couldn't be serialized |
| # Publish all agents to queue | ||
| for history_entry in transfer_chain: | ||
| if 'queue_data' in history_entry: | ||
| await sub_queue_obj.publish_message(history_entry['queue_data']) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing error handling for queue publishing operations. If publishing fails for one agent, it could affect the entire chain. Consider adding try/except blocks around the publish operations to ensure the process continues even if one publish fails:
| # Publish all agents to queue | |
| for history_entry in transfer_chain: | |
| if 'queue_data' in history_entry: | |
| await sub_queue_obj.publish_message(history_entry['queue_data']) | |
| # Publish all agents to queue | |
| for history_entry in transfer_chain: | |
| if 'queue_data' in history_entry: | |
| try: | |
| await sub_queue_obj.publish_message(history_entry['queue_data']) | |
| except Exception as e: | |
| logger.error(f"Failed to publish queue message for agent {history_entry['bridge_id']}: {str(e)}") | |
| queue_data = await make_request_data_and_publish_sub_queue(parsed_data, result, params, thread_info) | ||
| queue_data = make_json_serializable(queue_data) | ||
| current_history_data['queue_data'] = queue_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storing all queue_data in memory before publishing could lead to memory issues with large payloads or many agents in a complex flow. Consider publishing each agent's data immediately after preparing it rather than storing all data and publishing at the end.
all intermediate agents in flow publishes to queue