-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathperipheral_handler.py
More file actions
125 lines (97 loc) · 3.57 KB
/
peripheral_handler.py
File metadata and controls
125 lines (97 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from log_manager import LogManager
import logging
from bluezero import peripheral
class BleService:
def __init__(self, uuid: str, name: str):
self.uuid = uuid
self.name = name
def __repr__(self):
return f"{self.__class__.__name__} (uuid = {self.uuid}, name = {self.name})"
class BleChar(BleService):
data:bytes
def __read_cb_common(self) -> list[bytes]:
logging.debug(f"read callback for {self.name}!")
return list(self.data)
def __init__(self, uuid: str, name: str, read_data:bytes|str|None, notify_cb=None, write_cb=None):
super().__init__(uuid, name)
if read_data is not None:
if isinstance(read_data, str):
self.data = read_data.encode("utf-8")
else:
self.data = read_data
self.read_cb = self.__read_cb_common
else:
self.read_cb = None
self.write_cb = write_cb
self.notify_cb = notify_cb
return
def __repr__(self):
return super().__repr__()
class PeripheralHandler():
adapter_addr:str = None
logger:logging.Logger = None
periph:peripheral.Peripheral = None
services:dict[int, BleService] = {}
chars:dict[int, BleChar] = {}
def __init__(self, adapter_addr):
self.logger = LogManager().get_logger(self.__class__.__name__)
self.adapter_addr = adapter_addr
self.logger.debug(f"using local adapter: {self.adapter_addr}")
self.periph = peripheral.Peripheral(
adapter_address=self.adapter_addr,
#local_name=MOBILE_NAME
)
return
def add_service(self, service:BleService):
count = len(self.services)
self.periph.add_service(
srv_id=count,
uuid=service.uuid,
primary=True
)
self.services[count] = service
self.logger.info(f"service added as #{count} {service}")
return
def __calc_flags(self, char:BleChar) -> tuple[list, list]:
check = [("read", char.read_cb), ("write", char.write_cb), ("notify", char.notify_cb)]
flags = []
for flag, func in check:
if func is not None:
flags.append(flag)
if len(flags) == 0:
raise Exception(f"Calculated empty flags for {char}. Config mistake?")
return flags
def add_char(self, service:BleService, char:BleChar):
count = len(self.chars)
found = False
for id, s in self.services.items():
if s == service:
flags = self.__calc_flags(char)
self.periph.add_characteristic(
srv_id=id,
chr_id=count,
uuid=char.uuid,
value=[],
notifying=False,
flags=flags,
read_callback=char.read_cb,
notify_callback=char.notify_cb,
write_callback=char.write_cb
)
found = True
self.logger.info(f"char {char.name} added to service {s.name} (#{id}) with flags {' '.join(flags)}")
break
if not found:
raise Exception(f"could not find a matching service for char {char}!")
self.chars[count] = char
return
def set_on_connect(self, cb):
self.periph.on_connect = cb
return
def set_on_disconnect(self, cb):
self.periph.on_disconnect = cb
return
def publish(self):
self.logger.warning("publishing BLE...")
self.periph.publish()
return