Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions bumble/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from dataclasses import field
from typing import (
Any,
ClassVar,
Literal,
TypeVar,
cast,
Expand Down Expand Up @@ -2300,10 +2299,10 @@ class HCI_Packet:
Abstract Base class for HCI packets
'''

hci_packet_type: ClassVar[int]
hci_packet_type: int

@staticmethod
def from_bytes(packet: bytes) -> HCI_Packet:
@classmethod
def from_bytes(cls, packet: bytes) -> HCI_Packet:
packet_type = packet[0]

if packet_type == HCI_COMMAND_PACKET:
Expand All @@ -2323,7 +2322,7 @@ def from_bytes(packet: bytes) -> HCI_Packet:

return HCI_CustomPacket(packet)

def __init__(self, name):
def __init__(self, name: str) -> None:
self.name = name

def __bytes__(self) -> bytes:
Expand All @@ -2335,7 +2334,7 @@ def __repr__(self) -> str:

# -----------------------------------------------------------------------------
class HCI_CustomPacket(HCI_Packet):
def __init__(self, payload):
def __init__(self, payload: bytes) -> None:
super().__init__('HCI_CUSTOM_PACKET')
self.hci_packet_type = payload[0]
self.payload = payload
Expand Down Expand Up @@ -7452,15 +7451,22 @@ class HCI_Vendor_Event(HCI_Event):


# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_AclDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.2 HCI ACL Data Packets
'''

hci_packet_type = HCI_ACL_DATA_PACKET

@staticmethod
def from_bytes(packet: bytes) -> HCI_AclDataPacket:
connection_handle: int
pb_flag: int
bc_flag: int
data_total_length: int
data: bytes

@classmethod
def from_bytes(cls, packet: bytes) -> HCI_AclDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HH', packet, 1)
connection_handle = h & 0xFFF
Expand All @@ -7469,25 +7475,22 @@ def from_bytes(packet: bytes) -> HCI_AclDataPacket:
data = packet[5:]
if len(data) != data_total_length:
raise InvalidPacketError('invalid packet length')
return HCI_AclDataPacket(
connection_handle, pb_flag, bc_flag, data_total_length, data
return cls(
connection_handle=connection_handle,
pb_flag=pb_flag,
bc_flag=bc_flag,
data_total_length=data_total_length,
data=data,
)

def __bytes__(self):
def __bytes__(self) -> bytes:
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
+ self.data
)

def __init__(self, connection_handle, pb_flag, bc_flag, data_total_length, data):
self.connection_handle = connection_handle
self.pb_flag = pb_flag
self.bc_flag = bc_flag
self.data_total_length = data_total_length
self.data = data

def __str__(self):
def __str__(self) -> str:
return (
f'{color("ACL", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
Expand All @@ -7498,15 +7501,21 @@ def __str__(self):


# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_SynchronousDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
'''

hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET

@staticmethod
def from_bytes(packet: bytes) -> HCI_SynchronousDataPacket:
connection_handle: int
packet_status: int
data_total_length: int
data: bytes

@classmethod
def from_bytes(cls, packet: bytes) -> HCI_SynchronousDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HB', packet, 1)
connection_handle = h & 0xFFF
Expand All @@ -7516,8 +7525,11 @@ def from_bytes(packet: bytes) -> HCI_SynchronousDataPacket:
raise InvalidPacketError(
f'invalid packet length {len(data)} != {data_total_length}'
)
return HCI_SynchronousDataPacket(
connection_handle, packet_status, data_total_length, data
return cls(
connection_handle=connection_handle,
packet_status=packet_status,
data_total_length=data_total_length,
data=data,
)

def __bytes__(self) -> bytes:
Expand All @@ -7527,18 +7539,6 @@ def __bytes__(self) -> bytes:
+ self.data
)

def __init__(
self,
connection_handle: int,
packet_status: int,
data_total_length: int,
data: bytes,
) -> None:
self.connection_handle = connection_handle
self.packet_status = packet_status
self.data_total_length = data_total_length
self.data = data

def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
Expand All @@ -7556,7 +7556,7 @@ class HCI_IsoDataPacket(HCI_Packet):
See Bluetooth spec @ 5.4.5 HCI ISO Data Packets
'''

hci_packet_type: ClassVar[int] = HCI_ISO_DATA_PACKET
hci_packet_type = HCI_ISO_DATA_PACKET

connection_handle: int
data_total_length: int
Expand Down
10 changes: 10 additions & 0 deletions bumble/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,16 @@ def send_acl_sdu(self, connection_handle: int, sdu: bytes) -> None:
)
packet_queue.enqueue(acl_packet, connection_handle)

def send_sco_sdu(self, connection_handle: int, sdu: bytes) -> None:
self.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=connection_handle,
packet_status=0,
data_total_length=len(sdu),
data=sdu,
)
)

def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu)))

Expand Down
10 changes: 3 additions & 7 deletions examples/run_hfp_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,9 @@ def on_sco_packet(packet: hci.HCI_SynchronousDataPacket):
if source_file and (pcm_data := source_file.read(packet.data_total_length)):
assert ag_protocol
host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
host.send_hci_packet(
hci.HCI_SynchronousDataPacket(
connection_handle=packet.connection_handle,
packet_status=0,
data_total_length=len(pcm_data),
data=pcm_data,
)
host.send_sco_sdu(
connection_handle=packet.connection_handle,
sdu=pcm_data,
)


Expand Down
Loading