Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions config/imu.json5
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
version: "v1.0.0",
hertz: 10,
name: "imu_robot",
api_key: "${OM_API_KEY:-openmind_free}",
system_prompt_base: "You are a robot with an IMU sensor. Monitor your orientation and movement. If you detect a fall or impact, immediately execute the appropriate recovery action. Stay safe and protect yourself and those around you.",
system_prompt_examples: "Here are some examples of interactions you might encounter:\n\n1. If the robot falls, you might:\n fall_recovery: {{'action': 'stand_up'}}\n Speak: {{'sentence': 'I fell down, trying to stand up!'}}\n\n2. If an impact is detected, you might:\n fall_recovery: {{'action': 'emergency_stop'}}\n Speak: {{'sentence': 'Impact detected, stopping for safety.'}}\n\n3. If the robot is operating normally, you might:\n Speak: {{'sentence': 'All systems normal, monitoring orientation.'}}",
system_governance: "Here are the laws that govern your actions. Do not violate these laws.\nFirst Law: A robot cannot harm a human or allow a human to come to harm.\nSecond Law: A robot must obey orders from humans, unless those orders conflict with the First Law.\nThird Law: A robot must protect itself, as long as that protection doesn't conflict with the First or Second Law.",
agent_inputs: [
{
type: "IMUInput",
config: {
port: "/dev/ttyUSB0",
baudrate: 115200,
timeout: 1.0,
fall_threshold: 45.0,
impact_threshold: 20.0,
poll_interval: 0.1,
},
},
],
agent_backgrounds: [
{
type: "IMUFallDetector",
},
],
cortex_llm: {
type: "OpenAILLM",
config: {
agent_name: "IMURobot",
history_length: 5,
},
},
agent_actions: [
{
name: "fall_recovery",
llm_label: "fall_recovery",
connector: "serial",
config: {
port: "/dev/ttyUSB0",
baudrate: 115200,
timeout: 2.0,
},
},
{
name: "speak",
llm_label: "speak",
connector: "elevenlabs_tts",
config: {},
},
],
}
Empty file.
Empty file.
134 changes: 134 additions & 0 deletions src/actions/fall_recovery/connector/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import json
import logging
from typing import Optional

import serial as _pyserial
from pydantic import Field

from actions.base import ActionConfig, ActionConnector
from actions.fall_recovery.interface import FallRecoveryAction, FallRecoveryInput
from providers.imu_provider import IMUProvider


class FallRecoverySerialConfig(ActionConfig):
"""
Configuration for Fall Recovery Serial connector.

Parameters
----------
port : str
Serial port for the robot controller.
baudrate : int
Serial communication baudrate.
timeout : float
Serial write timeout in seconds.
"""

port: str = Field(
default="/dev/ttyUSB0",
description="Serial port for robot controller",
)
baudrate: int = Field(
default=115200,
description="Serial communication baudrate",
)
timeout: float = Field(
default=2.0,
description="Serial write timeout in seconds",
)


class FallRecoverySerialConnector(
ActionConnector[FallRecoverySerialConfig, FallRecoveryInput]
):
"""
Serial connector for fall recovery actions.

Sends recovery commands to robot controller via serial port.
Compatible with Arduino-based controllers or any serial-capable
robot platform.
"""

def __init__(self, config: FallRecoverySerialConfig):
"""
Initialize the FallRecoverySerialConnector.

Parameters
----------
config : FallRecoverySerialConfig
Configuration for the connector.
"""
super().__init__(config)

self.ser: Optional[_pyserial.Serial] = None
self.imu_provider = IMUProvider()

try:
self.ser = _pyserial.Serial(
config.port, config.baudrate, timeout=config.timeout
)
logging.info(f"FallRecoverySerialConnector: connected to {config.port}")
except Exception as e:
logging.error(
f"FallRecoverySerialConnector: failed to open serial port - {e}"
)

def _send_command(self, command: dict) -> bool:
"""
Send a JSON command via serial port.

Parameters
----------
command : dict
Command dictionary to serialize and send.

Returns
-------
bool
True if sent successfully, False otherwise.
"""
if self.ser is None:
logging.error("FallRecoverySerialConnector: serial port not available")
return False

try:
payload = json.dumps(command) + "\n"
self.ser.write(payload.encode("utf-8"))
logging.info(f"FallRecoverySerialConnector: sent command={command}")
return True
except Exception as e:
logging.error(f"FallRecoverySerialConnector: error sending command - {e}")
return False

async def connect(self, output_interface: FallRecoveryInput) -> None:
"""
Execute a fall recovery action.

Parameters
----------
output_interface : FallRecoveryInput
Input containing the recovery action to perform.
"""
action = output_interface.action
message = output_interface.message

logging.info(
f"FallRecoverySerialConnector: executing action={action.value} "
f"message='{message}'"
)

if action == FallRecoveryAction.STAND_UP:
self._send_command({"action": "stand_up", "message": message})
self.imu_provider.reset_alerts()

elif action == FallRecoveryAction.EMERGENCY_STOP:
self._send_command({"action": "emergency_stop", "message": message})

elif action == FallRecoveryAction.ALERT_OPERATOR:
logging.warning(f"FallRecoverySerialConnector: operator alert - {message}")
self._send_command({"action": "alert_operator", "message": message})

else:
logging.warning(
f"FallRecoverySerialConnector: unknown action '{action.value}'"
)
43 changes: 43 additions & 0 deletions src/actions/fall_recovery/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from dataclasses import dataclass
from enum import Enum

from actions.base import Interface


class FallRecoveryAction(str, Enum):
"""Supported fall recovery actions."""

STAND_UP = "stand_up"
EMERGENCY_STOP = "emergency_stop"
ALERT_OPERATOR = "alert_operator"


@dataclass
class FallRecoveryInput:
"""
Input interface for the FallRecovery action.

Parameters
----------
action : FallRecoveryAction
The recovery action to perform.
message : str
Optional message describing the situation.
"""

action: FallRecoveryAction = FallRecoveryAction.STAND_UP
message: str = ""


@dataclass
class FallRecovery(Interface[FallRecoveryInput, FallRecoveryInput]):
"""
This action allows the robot to recover from a fall or impact event.

Effect: Executes fall recovery procedures including standing up,
emergency stop, or alerting the operator. Triggered automatically
by IMU fall detection or manually via LLM decision.
"""

input: FallRecoveryInput
output: FallRecoveryInput
73 changes: 73 additions & 0 deletions src/backgrounds/plugins/imu_fall_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import logging
import threading

from backgrounds.base import Background, BackgroundConfig
from providers.context_provider import ContextProvider
from providers.imu_provider import IMUProvider


class IMUFallDetector(Background[BackgroundConfig]):
"""
Background task that continuously monitors IMU data for fall
and impact events, updating the context provider when detected.
"""

def __init__(self, config: BackgroundConfig):
"""
Initialize the IMUFallDetector background task.

Parameters
----------
config : BackgroundConfig
Configuration for the background task.
"""
super().__init__(config)

self._lock = threading.Lock()
self.imu_provider = IMUProvider()
self.context_provider = ContextProvider()

self._fall_reported: bool = False
self._impact_reported: bool = False

logging.info("IMUFallDetector background task initialized.")

def run(self) -> None:
"""
Monitor IMU state and update context on fall or impact detection.
"""
state = self.imu_provider.state

with self._lock:
if state["is_fallen"] and not self._fall_reported:
logging.warning("IMUFallDetector: fall detected, updating context.")
self.context_provider.update_context(
{
"imu_fall_detected": True,
"imu_roll": state["roll"],
"imu_pitch": state["pitch"],
}
)
self._fall_reported = True

elif not state["is_fallen"] and self._fall_reported:
logging.info("IMUFallDetector: fall resolved, resetting context.")
self.context_provider.update_context({"imu_fall_detected": False})
self._fall_reported = False

if state["impact_detected"] and not self._impact_reported:
logging.warning("IMUFallDetector: impact detected, updating context.")
self.context_provider.update_context({"imu_impact_detected": True})
self._impact_reported = True

elif not state["impact_detected"] and self._impact_reported:
self.context_provider.update_context({"imu_impact_detected": False})
self._impact_reported = False

self.sleep(0.1)

def stop(self) -> None:
"""
Stop the IMUFallDetector background task.
"""
logging.info("Stopping IMUFallDetector background task.")
Loading
Loading