Skip to content
2 changes: 1 addition & 1 deletion examples/interfaces/voltage_regulator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ messages:
port_type: nova_dsdl.sensors.msg.Voltage.1.0
- name: max_voltage
port_type: nova_dsdl.sensors.msg.Voltage.1.0
# port_id ommited; inferred during compilation
# port_id ommited; inferred during compilation
14 changes: 7 additions & 7 deletions examples/arm.yaml → examples/systems/arm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@ can_buses:
- name: can0
rate: 250000
devices:
- name: J1
- name: j1
node_id: 1
device_type: interfaces/motor_driver
- name: J2
- name: j2
node_id: 2
device_type: interfaces/motor_driver
- name: J3
- name: j3
node_id: 3
device_type: interfaces/motor_driver
- name: J4
- name: j4
node_id: 4
device_type: interfaces/motor_driver
- name: J5
- name: j5
node_id: 5
device_type: interfaces/motor_driver
- name: J6
- name: j6
node_id: 6
device_type: interfaces/motor_driver
- name: EndEffector
- name: end_effector
node_id: 7
device_type: interfaces/motor_driver
18 changes: 9 additions & 9 deletions examples/systems/chassis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,30 @@ can_buses:
- name: front_left_wheel
node_id: 1
device_type: motor_driver
- name: Back_Left_Wheel
- name: back_left_wheel
node_id: 2
device_type: motor_driver
- name: Back_Right_Wheel
- name: back_right_wheel
node_id: 3
device_type: motor_driver
- name: Front_Right_Wheel
- name: front_right_wheel
node_id: 4
device_type: motor_driver
- name: Front_Left_Pivot
- name: front_left_pivot
node_id: 5
device_type: motor_driver
- name: Back_Left_Pivot
- name: back_left_pivot
node_id: 6
device_type: motor_driver
- name: Back_Right_Pivot
- name: back_right_pivot
node_id: 7
device_type: motor_driver
- name: Front_Right_Pivot
- name: front_right_pivot
node_id: 8
device_type: motor_driver
- name: 5v_Hot_Reg
- name: 5v_hot_reg
node_id: 9
device_type: voltage_regulator
- name: 12v_Hot_Reg
- name: 12v_hot_reg
node_id: 10
device_type: voltage_regulator
180 changes: 128 additions & 52 deletions src/python/nova_can/communication.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from dataclasses import dataclass
import importlib
from typing import Callable, Dict, Optional, Tuple, Self, Protocol
from typing import Optional, Tuple, Self, Protocol, List, Callable, Any
from enum import Enum
import time
import threading
from queue import SimpleQueue, Empty
import logging


import can
from typer.models import NoneType
from nunavut_support import serialize, deserialize, update_from_builtin, to_builtin

from .utils import SystemInfo, import_dsdl_modules, dsdl_module_to_import_path
Expand Down Expand Up @@ -72,7 +76,7 @@ class SendResult:
success: bool
message: str

def create_system_buses(system_info: SystemInfo) -> Dict[str, can.Bus]:
def create_system_buses(system_info: SystemInfo) -> dict[str, can.BusABC]:
return {
bus.name: can.Bus(
channel=bus.name,
Expand All @@ -84,26 +88,30 @@ def create_system_buses(system_info: SystemInfo) -> Dict[str, can.Bus]:


class CanTransmitter:
def __init__(self, system_info: SystemInfo, sender_id: int = 0):
def __init__(self, system_info: SystemInfo, transmitter_id: int = 0):
self.system_info = system_info
self.sender_id = sender_id
self.transmitter_id = transmitter_id
self.modules = import_dsdl_modules(system_info)
self.can_buses = create_system_buses(system_info)

def send_message(self, device_name: str, port_name: str, dsdl_data_dict: Dict, priority: Priority = Priority.Nominal) -> SendResult:
def send_message(self, device_name: str, port_name: str, dsdl_data_dict: dict, priority: Priority = Priority.Nominal, from_device: bool = False) -> SendResult:
"""
Send a message to a device on a port.
"""
device = self.system_info.devices[device_name]
port = device.interface.messages.receive[port_name]
# Flip which message set we pull the port from based on perspective
if from_device:
port = device.interface.messages.transmit[port_name]
else:
port = device.interface.messages.receive[port_name]

can_id = CanID(
priority=priority.value,
service=False,
service_request=False,
port_id=port.port_id,
destination_id=device.node_id,
source_id=self.sender_id)
destination_id=(0 if from_device else device.node_id),
source_id=self.transmitter_id)


## TODO: Add support for multi-frame transfers
Expand All @@ -130,38 +138,92 @@ def send_message(self, device_name: str, port_name: str, dsdl_data_dict: Dict, p
self.can_buses[device.can_bus].send(message)

return SendResult(success=True, message=f"Message sent to {device_name} on {port_name}")


def stop(self):
for _, can_bus in self.can_buses.items():
can_bus.shutdown()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.stop()


class CanCallback(Protocol):
def __call__(self, system_name: str,
device_name: str,
port_name: Port,
data: Dict) -> None:
def __call__(self,
system_name: str,
device_name: str,
port_name: Port,
data: dict,
*local_vars: Any) -> None:
...

class CanReceiver:
"""
Receives messages from the CAN bus and calls the callback with the parsed message.
TODO: Add support for multi-frame transfers
TODO: Need to handle the case where we want to receive a message that we sent (eg, a receive message from a device)
TODO: Add filters
"""
def __init__(self, system_info: SystemInfo, callback: CanCallback, receiver_id: int = 0):
def __init__(self, system_info: SystemInfo,
callback: CanCallback,
consumer_init: Optional[Callable[..., dict]] = None,
consumer_init_args: Optional[Tuple[Any, ...]] = (),
receiver_id: int = 0, recv_timeout: float = 0.1,
queue_timeout: float = 0.5):
self.system_info = system_info
self.receiver_id = receiver_id
self.modules = import_dsdl_modules(system_info)
self.can_buses = create_system_buses(system_info)
self.callback = callback


def parse_message(self, msg: can.Message, bus_name: str) -> Optional[Tuple[str, str, str, Port, Dict]]:
if not msg.is_extended_id: #ignore sid frames (unsupported)
return None
self._modules = import_dsdl_modules(system_info)
self._callback = callback
self._consumer_init = consumer_init
self._consumer_init_args = consumer_init_args
self._recv_timeout = recv_timeout
self._queue_timeout = queue_timeout

self._can_buses: dict[str, can.BusABC] = None
self._msg_queue: SimpleQueue = None
self._stop_event = threading.Event()
self._bus_workers: List[threading.Thread] = []
self._consumer_thread = None


def _consumer_loop(self):
if self._consumer_init is not None:
local_vars = self._consumer_init(*self._consumer_init_args)
else:
local_vars = ()
while not self._stop_event.is_set():
logging.debug(f"Queue Size: {self._msg_queue.qsize()}")
try:
parsed_msg = self._msg_queue.get(timeout=self._queue_timeout)
self._callback(*parsed_msg, *local_vars)
except Empty:
continue

def _worker_loop(self, bus_name: str, can_bus: can.BusABC):
while not self._stop_event.is_set():
msg = can_bus.recv(timeout=self._recv_timeout)
if msg is not None:
try:
parsed_msg = self.parse_message(msg, bus_name)
except ValueError as e:
logging.warning(f"Dropping invalid message received on {bus_name}: {e}")
continue
if parsed_msg[3] is None:
logging.warning(f"Dropping invalid payload received on {parsed_msg[0]}.{parsed_msg[1]}.{parsed_msg[2].name}")
continue
self._msg_queue.put(parsed_msg)

def parse_message(self, msg: can.Message, bus_name: str) -> Optional[Tuple[str, str, str, Port, dict]]:
if not msg.is_extended_id: #ignore sid frames (unsupported)self.parse_messag
raise ValueError("SID frame")

can_id = CanID.from_serialized(msg.arbitration_id)
if can_id.destination_id != self.receiver_id and can_id.destination_id != 0: #TODO: Filter by destination id
return None
raise ValueError("Destination ID not matching receiver ID")

if can_id.service: #TODO: Handle service messages
return None
raise ValueError("Service message")

rx_device = None

Expand All @@ -170,46 +232,60 @@ def parse_message(self, msg: can.Message, bus_name: str) -> Optional[Tuple[str,
rx_device = device

if rx_device is None:
return None
raise ValueError("Source ID does not match any device")

if rx_device.interface is None:
raise ValueError("Device has no interface")

if device.interface is None:
return None
port = device.interface.get_port_by_id(can_id.port_id).get('transmit')
port = rx_device.interface.get_port_by_id(can_id.port_id).get('transmit')
if port is None:
return None
raise ValueError("Port not found")

header = FrameHeader.from_serialized(msg.data[0])
if header.start_of_transfer and not header.end_of_transfer: #TODO: Handle multi-frame transfers
return None
raise ValueError("Start of transfer but not end of transfer. Multi-frame transfer not supported")
if not header.start_of_transfer and header.end_of_transfer: #TODO: Handle multi-frame transfers
return None
raise ValueError("End of transfer but not start of transfer. Multi-frame transfer not supported")


serialized_fragment_view = memoryview(msg.data[1:])
payload_buff = bytearray(msg.data[1:])
serialized_fragment_view = memoryview(payload_buff)

dsdl_class = getattr(self.modules[port.port_type],
dsdl_class = getattr(self._modules[port.port_type],
dsdl_module_to_import_path(port.port_type).split('.')[-1])

deserialized_dsdl = deserialize(dsdl_class, [serialized_fragment_view])
dsdl_data_dict = to_builtin(deserialized_dsdl)

return rx_device.source_system, rx_device.name, port, dsdl_data_dict

def run(self):
while True:
for bus_name, bus in self.can_buses.items():
msg = bus.recv()
if msg is not None:
result = self.parse_message(msg, bus_name)
if result is not None:
self.callback(*result)
#time.sleep(0.001) ##TODO: switch to selectors for better perfomance/latency








def start(self):
self._msg_queue = SimpleQueue()
self._can_buses = create_system_buses(self.system_info)
self._consumer_thread = threading.Thread(target=self._consumer_loop, name="consumer-thread")
for bus_name, bus in self._can_buses.items():
self._bus_workers.append(threading.Thread(target=self._worker_loop,
name=f"{bus_name}-worker",
args=(bus_name, bus)))
self._stop_event.clear()
self._consumer_thread.start()
for worker in self._bus_workers:
worker.start()

def stop(self):
self._stop_event.set()
for worker in self._bus_workers:
worker.join()
self._consumer_thread.join()
self._consumer_thread = None
self._bus_workers = []
self._msg_queue = None
for _, can_bus in self._can_buses.items():
can_bus.shutdown()


def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.stop()
Loading