diff --git a/examples/interfaces/voltage_regulator.yaml b/examples/interfaces/voltage_regulator.yaml index fe7c364..ad7d6da 100644 --- a/examples/interfaces/voltage_regulator.yaml +++ b/examples/interfaces/voltage_regulator.yaml @@ -16,4 +16,4 @@ messages: port_type: nova_dsdl.sensors.msg.Voltage.1.0 - name: max_voltage port_type: nova_dsdl.sensors.msg.Voltage.1.0 - # port_id ommited; inferred during compilation + # port_id ommited; inferred during compilation \ No newline at end of file diff --git a/examples/arm.yaml b/examples/systems/arm.yaml similarity index 79% rename from examples/arm.yaml rename to examples/systems/arm.yaml index 835b01e..873e8ff 100644 --- a/examples/arm.yaml +++ b/examples/systems/arm.yaml @@ -3,24 +3,24 @@ can_buses: - name: can0 rate: 250000 devices: - - name: J1 + - name: j1 node_id: 1 device_type: interfaces/motor_driver - - name: J2 + - name: j2 node_id: 2 device_type: interfaces/motor_driver - - name: J3 + - name: j3 node_id: 3 device_type: interfaces/motor_driver - - name: J4 + - name: j4 node_id: 4 device_type: interfaces/motor_driver - - name: J5 + - name: j5 node_id: 5 device_type: interfaces/motor_driver - - name: J6 + - name: j6 node_id: 6 device_type: interfaces/motor_driver - - name: EndEffector + - name: end_effector node_id: 7 device_type: interfaces/motor_driver diff --git a/examples/systems/chassis.yaml b/examples/systems/chassis.yaml index b0fc810..d0fd7cb 100644 --- a/examples/systems/chassis.yaml +++ b/examples/systems/chassis.yaml @@ -6,30 +6,30 @@ can_buses: - name: front_left_wheel node_id: 1 device_type: motor_driver - - name: Back_Left_Wheel + - name: back_left_wheel node_id: 2 device_type: motor_driver - - name: Back_Right_Wheel + - name: back_right_wheel node_id: 3 device_type: motor_driver - - name: Front_Right_Wheel + - name: front_right_wheel node_id: 4 device_type: motor_driver - - name: Front_Left_Pivot + - name: front_left_pivot node_id: 5 device_type: motor_driver - - name: Back_Left_Pivot + - name: back_left_pivot node_id: 6 device_type: motor_driver - - name: Back_Right_Pivot + - name: back_right_pivot node_id: 7 device_type: motor_driver - - name: Front_Right_Pivot + - name: front_right_pivot node_id: 8 device_type: motor_driver - - name: 5v_Hot_Reg + - name: 5v_hot_reg node_id: 9 device_type: voltage_regulator - - name: 12v_Hot_Reg + - name: 12v_hot_reg node_id: 10 device_type: voltage_regulator diff --git a/src/python/nova_can/communication.py b/src/python/nova_can/communication.py index 3a95d84..eead202 100644 --- a/src/python/nova_can/communication.py +++ b/src/python/nova_can/communication.py @@ -1,11 +1,15 @@ from dataclasses import dataclass import importlib -from typing import Callable, Dict, Optional, Tuple, Self, Protocol +from typing import Optional, Tuple, Self, Protocol, List, Callable, Any from enum import Enum import time +import threading +from queue import SimpleQueue, Empty +import logging import can +from typer.models import NoneType from nunavut_support import serialize, deserialize, update_from_builtin, to_builtin from .utils import SystemInfo, import_dsdl_modules, dsdl_module_to_import_path @@ -72,7 +76,7 @@ class SendResult: success: bool message: str -def create_system_buses(system_info: SystemInfo) -> Dict[str, can.Bus]: +def create_system_buses(system_info: SystemInfo) -> dict[str, can.BusABC]: return { bus.name: can.Bus( channel=bus.name, @@ -84,26 +88,30 @@ def create_system_buses(system_info: SystemInfo) -> Dict[str, can.Bus]: class CanTransmitter: - def __init__(self, system_info: SystemInfo, sender_id: int = 0): + def __init__(self, system_info: SystemInfo, transmitter_id: int = 0): self.system_info = system_info - self.sender_id = sender_id + self.transmitter_id = transmitter_id self.modules = import_dsdl_modules(system_info) self.can_buses = create_system_buses(system_info) - def send_message(self, device_name: str, port_name: str, dsdl_data_dict: Dict, priority: Priority = Priority.Nominal) -> SendResult: + def send_message(self, device_name: str, port_name: str, dsdl_data_dict: dict, priority: Priority = Priority.Nominal, from_device: bool = False) -> SendResult: """ Send a message to a device on a port. """ device = self.system_info.devices[device_name] - port = device.interface.messages.receive[port_name] + # Flip which message set we pull the port from based on perspective + if from_device: + port = device.interface.messages.transmit[port_name] + else: + port = device.interface.messages.receive[port_name] can_id = CanID( priority=priority.value, service=False, service_request=False, port_id=port.port_id, - destination_id=device.node_id, - source_id=self.sender_id) + destination_id=(0 if from_device else device.node_id), + source_id=self.transmitter_id) ## TODO: Add support for multi-frame transfers @@ -130,38 +138,92 @@ def send_message(self, device_name: str, port_name: str, dsdl_data_dict: Dict, p self.can_buses[device.can_bus].send(message) return SendResult(success=True, message=f"Message sent to {device_name} on {port_name}") - + + def stop(self): + for _, can_bus in self.can_buses.items(): + can_bus.shutdown() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.stop() + class CanCallback(Protocol): - def __call__(self, system_name: str, - device_name: str, - port_name: Port, - data: Dict) -> None: + def __call__(self, + system_name: str, + device_name: str, + port_name: Port, + data: dict, + *local_vars: Any) -> None: ... class CanReceiver: """ Receives messages from the CAN bus and calls the callback with the parsed message. TODO: Add support for multi-frame transfers - TODO: Need to handle the case where we want to receive a message that we sent (eg, a receive message from a device) + TODO: Add filters """ - def __init__(self, system_info: SystemInfo, callback: CanCallback, receiver_id: int = 0): + def __init__(self, system_info: SystemInfo, + callback: CanCallback, + consumer_init: Optional[Callable[..., dict]] = None, + consumer_init_args: Optional[Tuple[Any, ...]] = (), + receiver_id: int = 0, recv_timeout: float = 0.1, + queue_timeout: float = 0.5): self.system_info = system_info self.receiver_id = receiver_id - self.modules = import_dsdl_modules(system_info) - self.can_buses = create_system_buses(system_info) - self.callback = callback - - - def parse_message(self, msg: can.Message, bus_name: str) -> Optional[Tuple[str, str, str, Port, Dict]]: - if not msg.is_extended_id: #ignore sid frames (unsupported) - return None + self._modules = import_dsdl_modules(system_info) + self._callback = callback + self._consumer_init = consumer_init + self._consumer_init_args = consumer_init_args + self._recv_timeout = recv_timeout + self._queue_timeout = queue_timeout + + self._can_buses: dict[str, can.BusABC] = None + self._msg_queue: SimpleQueue = None + self._stop_event = threading.Event() + self._bus_workers: List[threading.Thread] = [] + self._consumer_thread = None + + + def _consumer_loop(self): + if self._consumer_init is not None: + local_vars = self._consumer_init(*self._consumer_init_args) + else: + local_vars = () + while not self._stop_event.is_set(): + logging.debug(f"Queue Size: {self._msg_queue.qsize()}") + try: + parsed_msg = self._msg_queue.get(timeout=self._queue_timeout) + self._callback(*parsed_msg, *local_vars) + except Empty: + continue + + def _worker_loop(self, bus_name: str, can_bus: can.BusABC): + while not self._stop_event.is_set(): + msg = can_bus.recv(timeout=self._recv_timeout) + if msg is not None: + try: + parsed_msg = self.parse_message(msg, bus_name) + except ValueError as e: + logging.warning(f"Dropping invalid message received on {bus_name}: {e}") + continue + if parsed_msg[3] is None: + logging.warning(f"Dropping invalid payload received on {parsed_msg[0]}.{parsed_msg[1]}.{parsed_msg[2].name}") + continue + self._msg_queue.put(parsed_msg) + + def parse_message(self, msg: can.Message, bus_name: str) -> Optional[Tuple[str, str, str, Port, dict]]: + if not msg.is_extended_id: #ignore sid frames (unsupported)self.parse_messag + raise ValueError("SID frame") + can_id = CanID.from_serialized(msg.arbitration_id) if can_id.destination_id != self.receiver_id and can_id.destination_id != 0: #TODO: Filter by destination id - return None + raise ValueError("Destination ID not matching receiver ID") if can_id.service: #TODO: Handle service messages - return None + raise ValueError("Service message") rx_device = None @@ -170,24 +232,25 @@ def parse_message(self, msg: can.Message, bus_name: str) -> Optional[Tuple[str, rx_device = device if rx_device is None: - return None + raise ValueError("Source ID does not match any device") + + if rx_device.interface is None: + raise ValueError("Device has no interface") - if device.interface is None: - return None - port = device.interface.get_port_by_id(can_id.port_id).get('transmit') + port = rx_device.interface.get_port_by_id(can_id.port_id).get('transmit') if port is None: - return None + raise ValueError("Port not found") header = FrameHeader.from_serialized(msg.data[0]) if header.start_of_transfer and not header.end_of_transfer: #TODO: Handle multi-frame transfers - return None + raise ValueError("Start of transfer but not end of transfer. Multi-frame transfer not supported") if not header.start_of_transfer and header.end_of_transfer: #TODO: Handle multi-frame transfers - return None + raise ValueError("End of transfer but not start of transfer. Multi-frame transfer not supported") - - serialized_fragment_view = memoryview(msg.data[1:]) + payload_buff = bytearray(msg.data[1:]) + serialized_fragment_view = memoryview(payload_buff) - dsdl_class = getattr(self.modules[port.port_type], + dsdl_class = getattr(self._modules[port.port_type], dsdl_module_to_import_path(port.port_type).split('.')[-1]) deserialized_dsdl = deserialize(dsdl_class, [serialized_fragment_view]) @@ -195,21 +258,34 @@ def parse_message(self, msg: can.Message, bus_name: str) -> Optional[Tuple[str, return rx_device.source_system, rx_device.name, port, dsdl_data_dict - def run(self): - while True: - for bus_name, bus in self.can_buses.items(): - msg = bus.recv() - if msg is not None: - result = self.parse_message(msg, bus_name) - if result is not None: - self.callback(*result) - #time.sleep(0.001) ##TODO: switch to selectors for better perfomance/latency - - - - - - - - + def start(self): + self._msg_queue = SimpleQueue() + self._can_buses = create_system_buses(self.system_info) + self._consumer_thread = threading.Thread(target=self._consumer_loop, name="consumer-thread") + for bus_name, bus in self._can_buses.items(): + self._bus_workers.append(threading.Thread(target=self._worker_loop, + name=f"{bus_name}-worker", + args=(bus_name, bus))) + self._stop_event.clear() + self._consumer_thread.start() + for worker in self._bus_workers: + worker.start() + + def stop(self): + self._stop_event.set() + for worker in self._bus_workers: + worker.join() + self._consumer_thread.join() + self._consumer_thread = None + self._bus_workers = [] + self._msg_queue = None + for _, can_bus in self._can_buses.items(): + can_bus.shutdown() + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.stop() \ No newline at end of file diff --git a/src/python/tooling/db_handler/can_db_handler.py b/src/python/tooling/db_handler/can_db_handler.py index 93d5511..1051bc6 100644 --- a/src/python/tooling/db_handler/can_db_handler.py +++ b/src/python/tooling/db_handler/can_db_handler.py @@ -1,9 +1,13 @@ import os import time import argparse +import threading +import signal import sqlite3 -from typing import List, Dict, Any +from typing import List, Dict, Any, Tuple +from pprint import pprint + from nova_can.utils.compose_system import get_compose_result_from_env from tooling.openMCT_system_compiler.compile_system import ( load_composed_system_dict, @@ -125,8 +129,7 @@ def insert_data(cursor, conn, topic, data_dict): conn.commit() insert_counter = 0 - -def setup_database(clear=False): +def setup_database(clear=False) -> Tuple[sqlite3.Connection, sqlite3.Cursor]: conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() if clear: @@ -186,14 +189,13 @@ def update_max_rows_per_table(cursor, conn, max_rows): ) conn.commit() - # ---------- Helper Functions ---------- def can_to_db_callback( - system_info, cursor, conn, max_rows, topic_prefix: str, verbose: bool = True + system_info, topic_prefix: str, verbose: bool = True ): """Create a callback that bridges CAN messages to SQLite.""" - def callback(system_name: str, device_name: str, port: object, data: dict): + def callback(system_name: str, device_name: str, port: object, data: dict, conn: sqlite3.Connection, cursor: sqlite3.Cursor): dtype = get_device_type(system_info, device_name) topic_base = f"{topic_prefix}.{system_name}.{dtype}.{device_name}.transmit.{port.name}".lower() flt_dct = flatten_dict(data) @@ -214,64 +216,22 @@ def callback(system_name: str, device_name: str, port: object, data: dict): insert_data(cursor, conn, topic, payload) if verbose: - print(f"[CAN→DB] Published: {topic} -> {payload}") + print(f"[CAN→DB] Received and written: {topic} -> {payload}") return callback - -def start_can_receiver( - system_info, - cursor, - conn, - max_rows, - topic_prefix: str = DEFAULT_MQTT_TOPIC_PREFIX, - verbose: bool = True, -): - """Start listening to CAN messages and forwarding them to DB.""" - receiver = CanReceiver( - system_info, - can_to_db_callback(system_info, cursor, conn, max_rows, topic_prefix, verbose), - ) - receiver.run() - - -# ---------- Start CAN Receiver & Database ---------- -def start_gateway( - max_rows=MAX_ROWS_PER_TABLE, - clear_db=True, - topic_prefix=DEFAULT_MQTT_TOPIC_PREFIX, - verbose=False, -): - """ - Start the CAN→DB gateway. - :param max_rows: max no. data entries per table - :param clear_db: clear db before starting can receiver (default = true) - :param verbose: Print DB inserts to console - """ - +def can_to_db_consumer_init(openmct_dict, max_rows, clear_db:bool): conn, cursor = setup_database(clear=clear_db) - # generate tables in SQLite db if they don't already exist - compose_dict = load_composed_system_dict() - openmct_dict = build_openmct_dict(compose_dict) create_all_tables(cursor, conn, openmct_dict, max_rows) # ensure triggers are updated in the event max_rows is changed via cli - if not clear_db: - update_max_rows_per_table(cursor, conn, max_rows) - - compose_result = get_compose_result_from_env() - if not compose_result or not compose_result.success: - raise RuntimeError(f"Failed to compose system: {compose_result.errors}") - system_info = compose_result.system - - """Start listening to CAN messages and forwarding them to DB.""" - start_can_receiver(system_info, cursor, conn, max_rows, topic_prefix, verbose) - + if not clear_db: + update_max_rows_per_table(cursor, conn, max_rows) + + return conn, cursor # ---------- Command-Line Interface ---------- - - def start_gateway_cli(): parser = argparse.ArgumentParser( description="Starts a CAN to DB gateway that listens for CAN messages and inserts them into SQLite.\n " @@ -304,13 +264,28 @@ def start_gateway_cli(): ) args = parser.parse_args() - start_gateway( - max_rows=args.max_rows, - clear_db=args.clear_db, - topic_prefix=args.topic_prefix, - verbose=args.verbose, - ) + # generate tables in SQLite db if they don't already exist + compose_dict = load_composed_system_dict() + openmct_dict = build_openmct_dict(compose_dict) + + compose_result = get_compose_result_from_env() + if not compose_result or not compose_result.success: + raise RuntimeError(f"Failed to compose system: {compose_result.errors}") + system_info = compose_result.system + + exit_event = threading.Event() + def handle_exit(signum, frame): + exit_event.set() + + signal.signal(signal.SIGINT, handle_exit) + signal.signal(signal.SIGTERM, handle_exit) + + with CanReceiver(system_info, + callback=can_to_db_callback(system_info, args.topic_prefix, args.verbose), + consumer_init=can_to_db_consumer_init, + consumer_init_args=(openmct_dict, args.max_rows, args.clear_db)) as can_rx: + exit_event.wait() # ---------- Default Usage ---------- if __name__ == "__main__": diff --git a/src/python/tooling/mqtt_handler/can_mqtt_handler.py b/src/python/tooling/mqtt_handler/can_mqtt_handler.py index 35c1b74..68e93b2 100644 --- a/src/python/tooling/mqtt_handler/can_mqtt_handler.py +++ b/src/python/tooling/mqtt_handler/can_mqtt_handler.py @@ -3,6 +3,8 @@ import random import argparse import json +import threading +import signal # Flatten nested dictionaries (only dicts are recursively flattened; lists/tuples left as values) from typing import Any, Dict @@ -135,24 +137,8 @@ def on_connect(client, userdata, flags, rc, properties=None): client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port) - client.loop_start() return client - -def start_can_receiver( - system_info, - mqtt_client, - topic_prefix: str = DEFAULT_MQTT_TOPIC_PREFIX, - verbose: bool = True, -): - """Start listening to CAN messages and forwarding them to MQTT.""" - receiver = CanReceiver( - system_info, - can_to_mqtt_callback(system_info, mqtt_client, topic_prefix, verbose), - ) - receiver.run() - - # ---------- MQTT-to-CAN Bridge ---------- def mqtt_to_can_callback(can_transmitter, verbose: bool = True): """ @@ -237,43 +223,6 @@ def on_message(client, userdata, msg): return on_message - -# ---------- Public API ---------- -def start_gateway( - broker: str = DEFAULT_MQTT_BROKER, - port: int = DEFAULT_MQTT_PORT, - username: str = DEFAULT_MQTT_USERNAME, - password: str = DEFAULT_MQTT_PASSWORD, - topic_prefix: str = DEFAULT_MQTT_TOPIC_PREFIX, - verbose: bool = True, -): - """ - Start the CAN to MQTT gateway. - :param broker: MQTT broker hostname - :param port: MQTT broker port - :param username: MQTT username - :param password: MQTT password - :param topic_prefix: Prefix for MQTT topics - :param system_info: Optional pre-composed system info, otherwise composed from env - """ - - compose_result = get_compose_result_from_env() - if not compose_result or not compose_result.success: - raise RuntimeError(f"Failed to compose system: {compose_result.errors}") - system_info = compose_result.system - - mqtt_client_instance = setup_mqtt_client(broker, port, username, password) - - # Create CAN transmitter (shared system_info) - can_transmitter = CanTransmitter(system_info) - - # MQTT to CAN callback setup - mqtt_client_instance.on_message = mqtt_to_can_callback(can_transmitter, verbose) - - # Start CAN receiver as before - start_can_receiver(system_info, mqtt_client_instance, topic_prefix, verbose) - - # ---------- Command-Line Interface ---------- def start_gateway_cli(): parser = argparse.ArgumentParser( @@ -320,15 +269,33 @@ def start_gateway_cli(): args = parser.parse_args() print("Verbosity:", args.verbose) - # Compose system_info from env if needed - start_gateway( - broker=args.broker, - port=args.port, - username=args.username, - password=args.password, - topic_prefix=args.topic_prefix, - verbose=args.verbose, - ) + + compose_result = get_compose_result_from_env() + if not compose_result or not compose_result.success: + raise RuntimeError(f"Failed to compose system: {compose_result.errors}") + system_info = compose_result.system + + + + exit_event = threading.Event() + def handle_exit(signum, frame): + exit_event.set() + + signal.signal(signal.SIGINT, handle_exit) + signal.signal(signal.SIGTERM, handle_exit) + + # MQTT to CAN callback setup + with CanTransmitter(system_info) as can_transmitter: + mqtt_client_instance = setup_mqtt_client(args.broker, args.port, args.username, args.password) + mqtt_client_instance.on_message = mqtt_to_can_callback(can_transmitter, args.verbose) + mqtt_client_instance.loop_start() + try: + receiver_callback = can_to_mqtt_callback(system_info, mqtt_client_instance, args.topic_prefix, args.verbose) + with CanReceiver(system_info, callback=receiver_callback) as can_rx: + exit_event.wait() + finally: + mqtt_client_instance.disconnect() + mqtt_client_instance.loop_stop() # ---------- Default Usage ---------- diff --git a/src/python/tooling/nova_can_cli/nova_can_cli.py b/src/python/tooling/nova_can_cli/nova_can_cli.py index b8cc465..e41304c 100644 --- a/src/python/tooling/nova_can_cli/nova_can_cli.py +++ b/src/python/tooling/nova_can_cli/nova_can_cli.py @@ -1,10 +1,17 @@ -from nova_can.communication import CanTransmitter, Priority -from typing import Dict -from nova_can.utils.compose_system import get_compose_result_from_env +from typing import Callable, Dict, Optional +import signal +import threading + import typer import time import json from typing_extensions import Annotated +from rich import print +from rich.pretty import Pretty + +from nova_can.communication import CanReceiver, CanTransmitter, Priority +from nova_can.models.device_models import Port +from nova_can.utils.compose_system import get_compose_result_from_env app = typer.Typer() compose_result = get_compose_result_from_env() @@ -27,24 +34,51 @@ def complete_device_names(incomplete: str) -> list[str]: inc = incomplete.lower() return [name for name in device_names if name.lower().startswith(inc)] +def complete_tx_port_names(ctx: typer.Context, incomplete: str): + """ + Autocompletion function for port names based on the selected device. -def complete_port_names(ctx: typer.Context, incomplete: str) -> list[str]: - """ - Autocompletion function for port names based on the selected device. + Args: + ctx (typer.Context): The Typer context to access other parameters. + incomplete (str): The current incomplete input from the user. + """ + dev_name = ctx.params.get("device_name") or None + from_device = ctx.params.get("from_device") or False + if dev_name is None: + return [] + else: + if from_device: + ports = list(system_info.devices[dev_name].interface.messages.transmit.keys()) + else: + ports = list(system_info.devices[dev_name].interface.messages.receive.keys()) + + if not incomplete: + return ports + inc = incomplete.lower() + return [p for p in ports if p.lower().startswith(inc)] - Args: - ctx (typer.Context): The Typer context to access other parameters. - incomplete (str): The current incomplete input from the user. - """ - dev_name = ctx.params.get("device_name") or None - if dev_name is None: - return [] - else: - ports = list(system_info.devices[dev_name].interface.messages.receive.keys()) - if not incomplete: - return ports - inc = incomplete.lower() - return [p for p in ports if p.lower().startswith(inc)] +def complete_rx_port_names(ctx: typer.Context, incomplete: str): + """ + Autocompletion function for port names based on the selected device. + + Args: + ctx (typer.Context): The Typer context to access other parameters. + incomplete (str): The current incomplete input from the user. + """ + dev_name = ctx.params.get("device_name") or None + from_device = ctx.params.get("from_device") or False + if dev_name is None: + return [] + else: + if from_device: + ports = list(system_info.devices[dev_name].interface.messages.receive.keys()) + else: + ports = list(system_info.devices[dev_name].interface.messages.transmit.keys()) + + if not incomplete: + return ports + inc = incomplete.lower() + return [p for p in ports if p.lower().startswith(inc)] def dsdl_example(dsdl_type: str) -> Dict: @@ -67,10 +101,14 @@ def complete_dsdl_data_json(ctx: typer.Context, incomplete: str) -> list[str]: """ dev_name = ctx.params.get("device_name") or None port_name = ctx.params.get("port_name") or None + from_device = ctx.params.get("from_device") or False if dev_name is None or port_name is None: return [] else: - port = system_info.devices[dev_name].interface.messages.receive[port_name] + if from_device: + port = system_info.devices[dev_name].interface.messages.transmit[port_name] + else: + port = system_info.devices[dev_name].interface.messages.receive[port_name] dsdl_type = port.port_type example_data = dsdl_example(dsdl_type) return [json.dumps(example_data, indent=2)] @@ -89,7 +127,7 @@ def tx( str, typer.Argument( help="The name of the port to send the message to as specified in interface.yaml", - autocompletion=complete_port_names, + autocompletion=complete_tx_port_names, ), ], dsdl_data_json: Annotated[ @@ -105,12 +143,22 @@ def tx( max_attempts: Annotated[ int, typer.Option(help="Retry attempts for a failed CAN bus transmission") ] = 1, - interval: Annotated[ + retry_interval: Annotated[ float, - typer.Option( - help="Time to wait between CAN bus retransmissions attempts (seconds)" - ), + typer.Option("--retry-interval", help="Time to wait between retry attempts (seconds)") ] = 0.5, + repeat: Annotated[ + Optional[int], + typer.Option("--repeat", help="Number of times to send; omit to send once") + ] = None, + interval: Annotated[ + Optional[float], + typer.Option("--interval", help="Period between sends (s); with no --repeat, send perpetually") + ] = None, + from_device: Annotated[ + bool, + typer.Option("--from", help="Interpret device as sender (device perspective)") + ] = False, ): # All three arguments are required positionally @@ -118,33 +166,109 @@ def tx( dsdl_data_dict = json.loads(dsdl_data_json) except json.JSONDecodeError as e: raise typer.BadParameter(f"Invalid JSON data: {e}") + + if device_name not in system_info.devices: + raise typer.BadParameter(f"Invalid device name {device_name}") + + if from_device: + ports = system_info.devices[device_name].interface.messages.transmit + if port_name not in ports: + raise typer.BadParameter(f"Invalid port name {port_name} for device {device_name} (expected a transmit port)") + else: + ports = system_info.devices[device_name].interface.messages.receive + if port_name not in ports: + raise typer.BadParameter(f"Invalid port name {port_name} for device {device_name} (expected a receive port)") + + if from_device: + transmitter_id = system_info.devices[device_name].node_id + else: + transmitter_id = 0 + + with CanTransmitter(system_info, transmitter_id=transmitter_id) as transmitter: + def attempt_send() -> bool: + for _ in range(max_attempts): + result = transmitter.send_message(device_name, port_name, dsdl_data_dict, priority, from_device=from_device) + if result.success: + direction = "from" if from_device else "to" + print(f"Successfully transmitted {port_name} {direction} {device_name}") + return True + else: + print(f"Failed to transmit: {result.message}. Retrying...") + time.sleep(retry_interval) + return False - transmitter = CanTransmitter(system_info) - for _ in range(max_attempts): - result = transmitter.send_message( - device_name, port_name, dsdl_data_dict, priority - ) - if result.success: - print(f"Successfully transmitted {port_name} to {device_name}") - break + if interval is not None and repeat is None: + while True: + attempt_send() + time.sleep(interval) else: - print(f"Failed to transmit: {result.message}. Retrying...") - time.sleep(interval) + send_count = repeat if repeat is not None else 1 + gap = interval if interval is not None else (1.0 if repeat is not None else None) + for i in range(send_count): + attempt_send() + if gap is not None and i < send_count - 1: + time.sleep(gap) + + - # TODO: Close the CAN bus @app.command(help="Receive CAN messages from a device") def rx( device_name: Annotated[ str, - typer.Option( + typer.Argument( help="The name of the device to send the message to as specified in system.yaml", autocompletion=complete_device_names, ), - ], + ] = None, + port_name: Annotated[ + str, + typer.Argument( + help="The name of the port to send the message to as specified in interface.yaml", + autocompletion=complete_rx_port_names, + ), + ] = None, + from_device: Annotated[ + bool, + typer.Option("--from", help="Interpret device as receiver (device perspective)") + ] = False, ): - pass # TODO: Placeholder for future implementation + if device_name is not None: + device_names = list(system_info.devices.keys()) + if device_name not in device_names: + raise typer.BadParameter(f"Invalid device name {device_name}") + + if port_name is not None: + if from_device: + ports = list(system_info.devices[device_name].interface.messages.receive.keys()) + else: + ports = list(system_info.devices[device_name].interface.messages.transmit.keys()) + if port_name not in ports: + raise typer.BadParameter(f"Invalid port name {port_name}") + + + def rx_callback(system_name: str, device: str, port: Port, data: dict): + if device_name is not None and device != device_name: + return + if port_name is not None and port_name != port.name: + return + print(f'{system_name}.{device}.{port.name}: ', Pretty(data), sep='') + + exit_event = threading.Event() + def handle_exit(signum, frame): + exit_event.set() + + signal.signal(signal.SIGINT, handle_exit) + signal.signal(signal.SIGTERM, handle_exit) + + receiver_id = 0 + if from_device: + if device_name not in system_info.devices: + raise typer.BadParameter(f"Invalid device name {device_name}") + receiver_id = system_info.devices[device_name].node_id + with CanReceiver(system_info, callback=rx_callback, receiver_id=receiver_id) as can_rx: + exit_event.wait() if __name__ == "__main__":