-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsake_handler.py
More file actions
126 lines (99 loc) · 3.96 KB
/
sake_handler.py
File metadata and controls
126 lines (99 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import threading
import queue
from log_manager import LogManager
from pysake.server import SakeServer
from pysake.constants import KEYDB_PUMP_EXTRACTED, KEYDB_PUMP_HARDCODED
class SakeHandler:
pump_enabled: bool = False
char = None
def __init__(self):
self.logger = LogManager.get_logger(self.__class__.__name__)
self._sender_queue = queue.Queue()
self._callback_queue = queue.Queue()
self._stop_evt = threading.Event()
self._tx_thread = threading.Thread(
target=self._thread_sender,
name="sake-sender",
daemon=True,
)
self._tx_thread.start()
self._cb_thread = threading.Thread(
target=self._thread_callback,
name="sake-callback",
daemon=True,
)
self._cb_thread.start()
self.server = SakeServer(KEYDB_PUMP_EXTRACTED)
return
# region thread safe apis
def notify_callback(self, is_notifying: bool, char):
self._callback_queue.put(("notify", is_notifying, char))
def write_callback(self, value: bytearray, options: dict):
self._callback_queue.put(("write", bytes(value), options))
def _send(self, data: bytes):
if self.char is None:
raise RuntimeError("Sake char is none! You forgot to call set_char()!")
self._sender_queue.put(data)
return
def is_done(self) -> bool:
return self.server.get_stage() == 6
# region actual logic
def _handle_notify(self, is_notifying: bool, char):
self.logger.debug(f"got a sake notification start/stop request!")
if self.char is None:
self.logger.info(f"sake char is first seen as {char}")
self.char = char
if is_notifying and not self.pump_enabled:
self.logger.warning("pump wants to be friends with us!")
self.pump_enabled = True
zeroes = bytes(20)
self._send(zeroes) # trigger sake client on the pump
# self.server.handshake(zeroes) DONT feed it here!
if not is_notifying:
self.pump_enabled = False
self.logger.error("pump disabled notifications!")
def _handle_write(self, value: bytes, options: dict):
value = bytes(value)
self.logger.debug(f"sake write callback received: {value.hex()}")
output = self.server.handshake(value)
if output is None and self.server.get_stage() == 6:
self.logger.info("SAKE HANDSHAKE IS DONE!!! CONGRATULATIONS!")
else:
self._send(output)
# region slave threads
def _thread_callback(self):
while not self._stop_evt.is_set():
try:
item = self._callback_queue.get(timeout=0.5)
except queue.Empty:
continue
try:
kind = item[0]
if kind == "notify":
_, is_notifying, char = item
self._handle_notify(is_notifying, char)
elif kind == "write":
_, value, options = item
self._handle_write(value, options)
else:
raise RuntimeError(f"Unknown callback type: {kind}")
except Exception as e:
self.logger.exception(f"Callback processing failed: {e}")
def _thread_sender(self):
"""
The ONLY place where real char.set_value() is allowed.
"""
while not self._stop_evt.is_set():
try:
data = self._sender_queue.get(timeout=0.5)
except queue.Empty:
continue
try:
self.char.set_value(list(data))
self.logger.debug(f"sent data on sake port: {data.hex()}")
except Exception as e:
self.logger.exception(f"sake tx failed: {e}")
# def close(self):
# self._stop_evt.set()
# self._sender_queue.put(b"")
# self._callback_queue.put(None)