Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 83 additions & 33 deletions src/runtime/manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
"""
Mode management system for OM1.

Handles mode transitions, lifecycle hooks, state persistence, and
Zenoh-based inter-process communication.
"""
import asyncio
import json
import logging
Expand Down Expand Up @@ -104,8 +110,12 @@ def __init__(self, config: ModeSystemConfig):
self._zenoh_mode_status_response_pub = self.session.declare_publisher(
self.mode_status_response
)
except Exception:
logging.exception("Error opening Zenoh client")
except Exception as e:
logging.error(
f"Zenoh session initialization failed during ModeManager startup. "
f"Reason: {e}. Recovery: Proceeding without Zenoh messaging support. "
f"Mode transitions and status reporting will not be available over Zenoh."
)
self.session = None
self._zenoh_mode_status_response_pub = None

Expand Down Expand Up @@ -145,14 +155,19 @@ def _create_runtime_config_file(self):
runtime_config = mode_config_to_dict(self.config)

temp_file = runtime_config_path + ".tmp"
with open(temp_file, "w") as f:
with open(temp_file, "w", encoding="utf-8") as f:
json5.dump(runtime_config, f, indent=2)

os.rename(temp_file, runtime_config_path)
logging.debug(f"Runtime config file created/updated: {runtime_config_path}")

except Exception:
logging.exception("Error creating runtime config file")
except Exception as e:
logging.error(
f"Failed to create runtime config file at {runtime_config_path}. "
f"Reason: {e}. Recovery: Mode state persistence and "
f"hot-reload monitoring will not work. Mode transitions will "
f"continue but state changes will not survive restarts."
)

def set_event_loop(self, loop: asyncio.AbstractEventLoop):
"""
Expand Down Expand Up @@ -230,7 +245,17 @@ async def _notify_transition_callbacks(self, from_mode: str, to_mode: str):
else:
callback(from_mode, to_mode)
except Exception as e:
logging.error(f"Error in transition callback: {e}")
callback_name = (
callback.__name__
if hasattr(callback, "__name__")
else "unknown"
)
logging.error(
f"Transition callback failed during {from_mode} -> "
f"{to_mode} transition. Callback: {callback_name}. "
f"Reason: {e}. Recovery: Continuing with remaining "
f"callbacks."
)

async def check_time_based_transitions(self) -> Optional[str]:
"""
Expand Down Expand Up @@ -262,15 +287,20 @@ async def check_time_based_transitions(self) -> Optional[str]:
LifecycleHookType.ON_TIMEOUT, timeout_context
)
except Exception as e:
logging.error(f"Error executing timeout lifecycle hooks: {e}")
logging.error(
f"Timeout lifecycle hooks failed for mode "
f"{self.state.current_mode}. Reason: {e}. Recovery: "
f"Proceeding with timeout-based transition."
)

for rule in self.config.transition_rules:
if (
rule.from_mode == self.state.current_mode or rule.from_mode == "*"
rule.from_mode in (self.state.current_mode, "*")
) and rule.transition_type == TransitionType.TIME_BASED:
if self._can_transition(rule):
logging.info(
f"Time-based transition triggered: {self.state.current_mode} -> {rule.to_mode}"
f"Time-based transition triggered: "
f"{self.state.current_mode} -> {rule.to_mode}"
)
return rule.to_mode

Expand All @@ -289,9 +319,8 @@ async def check_context_aware_transitions(self) -> Optional[str]:
matching_rules = []
for rule in self.config.transition_rules:
if (
rule.from_mode == self.state.current_mode or rule.from_mode == "*"
rule.from_mode in (self.state.current_mode, "*")
) and rule.transition_type == TransitionType.CONTEXT_AWARE:

if self._can_transition(rule) and self._evaluate_context_conditions(
rule
):
Expand All @@ -302,8 +331,10 @@ async def check_context_aware_transitions(self) -> Optional[str]:
matching_rules.sort(key=lambda r: r.priority, reverse=True)
target_rule = matching_rules[0]
logging.info(
f"Context-aware transition triggered: {self.state.current_mode} -> {target_rule.to_mode} "
f"(priority: {target_rule.priority}, conditions: {target_rule.context_conditions})"
f"Context-aware transition triggered: "
f"{self.state.current_mode} -> {target_rule.to_mode} "
f"(priority: {target_rule.priority}, "
f"conditions: {target_rule.context_conditions})"
)
return target_rule.to_mode

Expand Down Expand Up @@ -332,9 +363,8 @@ def check_input_triggered_transitions(self, input_text: str) -> Optional[str]:
matching_rules = []
for rule in self.config.transition_rules:
if (
rule.from_mode == self.state.current_mode or rule.from_mode == "*"
rule.from_mode in (self.state.current_mode, "*")
) and rule.transition_type == TransitionType.INPUT_TRIGGERED:

# Check if any trigger keywords are present
for keyword in rule.trigger_keywords:
if keyword.lower() in input_lower:
Expand Down Expand Up @@ -454,29 +484,26 @@ def _evaluate_single_condition(
return False
return True

elif "contains" in expected_value:
if "contains" in expected_value:
# String contains condition
if not isinstance(actual_value, str):
return False
return expected_value["contains"].lower() in actual_value.lower()

elif "one_of" in expected_value:
if "one_of" in expected_value:
# Value must be one of the specified options
return actual_value in expected_value["one_of"]

elif "not" in expected_value:
if "not" in expected_value:
# Negation condition
return actual_value != expected_value["not"]

elif isinstance(expected_value, list):
# List membership condition
return actual_value in expected_value

else:
# Simple equality condition
return actual_value == expected_value

return False
# Simple equality condition
return actual_value == expected_value

async def request_transition(
self, target_mode: str, reason: str = "manual"
Expand Down Expand Up @@ -612,7 +639,9 @@ async def _execute_transition(self, target_mode: str, reason: str) -> bool:

except Exception as e:
logging.error(
f"Failed to execute transition {from_mode} -> {target_mode}: {e}"
f"Mode transition {from_mode} -> {target_mode} failed with error: {e}. "
f"Recovery: Transition rolled back. System remains in {from_mode} mode. "
f"Check lifecycle hooks and transition conditions."
)
return False
finally:
Expand All @@ -630,7 +659,7 @@ def get_available_transitions(self) -> List[str]:
available = set()

for rule in self.config.transition_rules:
if rule.from_mode == self.state.current_mode or rule.from_mode == "*":
if rule.from_mode in (self.state.current_mode, "*"):
if self._can_transition(rule):
available.add(rule.to_mode)

Expand Down Expand Up @@ -747,7 +776,12 @@ def _zenoh_mode_status_request(self, data: zenoh.Sample):
else:
logging.error("Main event loop is not set or not running")
except Exception as e:
logging.error(f"Error scheduling mode switch request: {e}")
logging.error(
f"Error scheduling mode switch request to {target_mode}. "
f"Reason: {e}. Recovery: Request dropped, mode remains "
f"{self.state.current_mode}. Check event loop status "
f"and retry the request."
)
return

# Request current mode info
Expand Down Expand Up @@ -790,8 +824,16 @@ def _zenoh_context_update(self, data: zenoh.Sample):
else:
logging.warning(f"Invalid context data format: {context_data}")

except (json.JSONDecodeError, Exception) as e:
logging.error(f"Error processing context update: {e}")
except json.JSONDecodeError as e:
logging.error(
f"Failed to parse context update JSON from Zenoh. "
f"Reason: {e}. Recovery: Ignoring malformed context update."
)
except Exception as e:
logging.error(
f"Error processing context update from Zenoh: {e}. "
f"Recovery: Context update ignored, system continues with existing context."
)

async def _check_and_apply_context_transition(self):
"""
Expand All @@ -809,7 +851,11 @@ async def _check_and_apply_context_transition(self):
)
await self._execute_transition(context_target, "context_aware")
except Exception as e:
logging.error(f"Error checking context-aware transitions: {e}")
logging.error(
f"Error checking context-aware transitions after context update. "
f"Reason: {e}. Recovery: Skipping context-aware transition check, "
f"normal mode operations continue."
)

async def _handle_mode_switch_request(
self, frame_id: str, request_id: str, target_mode: str
Expand Down Expand Up @@ -882,7 +928,7 @@ def _load_mode_state(self):
state_file = self._get_state_file_path()

try:
with open(state_file, "r") as f:
with open(state_file, "r", encoding="utf-8") as f:
state_data = json.load(f)

last_active_mode = state_data.get("last_active_mode")
Expand All @@ -892,7 +938,6 @@ def _load_mode_state(self):
and last_active_mode in self.config.modes
and last_active_mode != self.config.default_mode
):

logging.info(f"Restoring last active mode: {last_active_mode}")
self.state.current_mode = last_active_mode
self.state.previous_mode = state_data.get("previous_mode")
Expand Down Expand Up @@ -939,11 +984,16 @@ def _save_mode_state(self):
}

temp_file = state_file + ".tmp"
with open(temp_file, "w") as f:
with open(temp_file, "w", encoding="utf-8") as f:
json.dump(state_data, f, indent=2)

os.rename(temp_file, state_file)
logging.debug(f"Mode state saved to {state_file}")

except Exception as e:
logging.error(f"Error saving mode state: {e}")
logging.error(
f"Error saving mode state to {state_file}. "
f"Reason: {e}. Recovery: Mode transition completed but "
f"state will not persist across restarts. Check filesystem "
f"permissions and disk space."
)
31 changes: 22 additions & 9 deletions src/zenoh_msgs/session.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
"""
Zenoh session management utilities.

Provides configuration and session initialization with automatic fallback
from local connection to network discovery.
"""
import logging

import zenoh
Expand Down Expand Up @@ -43,20 +49,27 @@ def open_zenoh_session() -> zenoh.Session:
"""
local_config = create_zenoh_config(network_discovery=False)
try:
session = zenoh.open(local_config)
local_session = zenoh.open(local_config)
logging.info("Zenoh client opened without network discovery")
return session
except Exception:
logging.info("Falling back to network discovery...")
return local_session
except Exception as local_err:
logging.warning(
f"Local Zenoh connection failed (endpoint: tcp/127.0.0.1:7447): {local_err}. "
"Attempting network discovery fallback..."
)

config = create_zenoh_config()
try:
session = zenoh.open(config)
discovery_session = zenoh.open(config)
logging.info("Zenoh client opened with network discovery")
return session
except Exception as e:
logging.error(f"Error opening Zenoh client: {e}")
raise Exception("Failed to open Zenoh session") from e
return discovery_session
except Exception as discovery_err:
logging.error(
f"Zenoh session initialization failed. "
f"Local connection failed, and network discovery also failed: {discovery_err}. "
f"Check Zenoh router status and network connectivity."
)
raise Exception("Failed to open Zenoh session") from discovery_err


if __name__ == "__main__":
Expand Down