diff --git a/bumble/hci.py b/bumble/hci.py index 9ae60a34..840b4986 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -28,7 +28,6 @@ from dataclasses import field from typing import ( Any, - ClassVar, Literal, TypeVar, cast, @@ -2300,10 +2299,10 @@ class HCI_Packet: Abstract Base class for HCI packets ''' - hci_packet_type: ClassVar[int] + hci_packet_type: int - @staticmethod - def from_bytes(packet: bytes) -> HCI_Packet: + @classmethod + def from_bytes(cls, packet: bytes) -> HCI_Packet: packet_type = packet[0] if packet_type == HCI_COMMAND_PACKET: @@ -2323,7 +2322,7 @@ def from_bytes(packet: bytes) -> HCI_Packet: return HCI_CustomPacket(packet) - def __init__(self, name): + def __init__(self, name: str) -> None: self.name = name def __bytes__(self) -> bytes: @@ -2335,7 +2334,7 @@ def __repr__(self) -> str: # ----------------------------------------------------------------------------- class HCI_CustomPacket(HCI_Packet): - def __init__(self, payload): + def __init__(self, payload: bytes) -> None: super().__init__('HCI_CUSTOM_PACKET') self.hci_packet_type = payload[0] self.payload = payload @@ -7452,6 +7451,7 @@ class HCI_Vendor_Event(HCI_Event): # ----------------------------------------------------------------------------- +@dataclasses.dataclass class HCI_AclDataPacket(HCI_Packet): ''' See Bluetooth spec @ 5.4.2 HCI ACL Data Packets @@ -7459,8 +7459,14 @@ class HCI_AclDataPacket(HCI_Packet): hci_packet_type = HCI_ACL_DATA_PACKET - @staticmethod - def from_bytes(packet: bytes) -> HCI_AclDataPacket: + connection_handle: int + pb_flag: int + bc_flag: int + data_total_length: int + data: bytes + + @classmethod + def from_bytes(cls, packet: bytes) -> HCI_AclDataPacket: # Read the header h, data_total_length = struct.unpack_from(' HCI_AclDataPacket: data = packet[5:] if len(data) != data_total_length: raise InvalidPacketError('invalid packet length') - return HCI_AclDataPacket( - connection_handle, pb_flag, bc_flag, data_total_length, data + return cls( + connection_handle=connection_handle, + pb_flag=pb_flag, + bc_flag=bc_flag, + data_total_length=data_total_length, + data=data, ) - def __bytes__(self): + def __bytes__(self) -> bytes: h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle return ( struct.pack(' str: return ( f'{color("ACL", "blue")}: ' f'handle=0x{self.connection_handle:04x}, ' @@ -7498,6 +7501,7 @@ def __str__(self): # ----------------------------------------------------------------------------- +@dataclasses.dataclass class HCI_SynchronousDataPacket(HCI_Packet): ''' See Bluetooth spec @ 5.4.3 HCI SCO Data Packets @@ -7505,8 +7509,13 @@ class HCI_SynchronousDataPacket(HCI_Packet): hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET - @staticmethod - def from_bytes(packet: bytes) -> HCI_SynchronousDataPacket: + connection_handle: int + packet_status: int + data_total_length: int + data: bytes + + @classmethod + def from_bytes(cls, packet: bytes) -> HCI_SynchronousDataPacket: # Read the header h, data_total_length = struct.unpack_from(' HCI_SynchronousDataPacket: raise InvalidPacketError( f'invalid packet length {len(data)} != {data_total_length}' ) - return HCI_SynchronousDataPacket( - connection_handle, packet_status, data_total_length, data + return cls( + connection_handle=connection_handle, + packet_status=packet_status, + data_total_length=data_total_length, + data=data, ) def __bytes__(self) -> bytes: @@ -7527,18 +7539,6 @@ def __bytes__(self) -> bytes: + self.data ) - def __init__( - self, - connection_handle: int, - packet_status: int, - data_total_length: int, - data: bytes, - ) -> None: - self.connection_handle = connection_handle - self.packet_status = packet_status - self.data_total_length = data_total_length - self.data = data - def __str__(self) -> str: return ( f'{color("SCO", "blue")}: ' @@ -7556,7 +7556,7 @@ class HCI_IsoDataPacket(HCI_Packet): See Bluetooth spec @ 5.4.5 HCI ISO Data Packets ''' - hci_packet_type: ClassVar[int] = HCI_ISO_DATA_PACKET + hci_packet_type = HCI_ISO_DATA_PACKET connection_handle: int data_total_length: int diff --git a/bumble/host.py b/bumble/host.py index 929480b1..bb1b2f9c 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -732,6 +732,16 @@ def send_acl_sdu(self, connection_handle: int, sdu: bytes) -> None: ) packet_queue.enqueue(acl_packet, connection_handle) + def send_sco_sdu(self, connection_handle: int, sdu: bytes) -> None: + self.send_hci_packet( + hci.HCI_SynchronousDataPacket( + connection_handle=connection_handle, + packet_status=0, + data_total_length=len(sdu), + data=sdu, + ) + ) + def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu))) diff --git a/examples/run_hfp_gateway.py b/examples/run_hfp_gateway.py index b5253dfa..726752d1 100644 --- a/examples/run_hfp_gateway.py +++ b/examples/run_hfp_gateway.py @@ -100,13 +100,9 @@ def on_sco_packet(packet: hci.HCI_SynchronousDataPacket): if source_file and (pcm_data := source_file.read(packet.data_total_length)): assert ag_protocol host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host - host.send_hci_packet( - hci.HCI_SynchronousDataPacket( - connection_handle=packet.connection_handle, - packet_status=0, - data_total_length=len(pcm_data), - data=pcm_data, - ) + host.send_sco_sdu( + connection_handle=packet.connection_handle, + sdu=pcm_data, )