-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·159 lines (120 loc) · 4.74 KB
/
main.py
File metadata and controls
executable file
·159 lines (120 loc) · 4.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
from utils import *
add_submodule_to_path() # bit of hacking ;)
import logging
import threading
import argparse
import time
from bluezero import adapter
from bluezero.device import Device
from bluezero.central import Central
from log_manager import LogManager
LogManager.init(level=logging.DEBUG)
from pump_advertiser import PumpAdvertiser
from peripheral_handler import PeripheralHandler, BleService, BleChar
from sake_handler import SakeHandler
from sg_reader import SGReader
ph:PeripheralHandler = None
pa:PumpAdvertiser = None
sh:SakeHandler = None
device:Device = None
def main_logic():
first = True
sg_reader: SGReader = None
last_read = time.monotonic()
while True:
sleep(0.1)
# SAKE handshake must have been completed
if sh is None or not sh.is_done():
continue
# connection to pump must have been established
# GATT discovery must have been completed
if not device or not device.services_resolved:
continue
if first:
logging.info("welcome from the main logic!")
first = False
assert device.services_resolved
pump = Central(device.address, device.adapter)
pump.load_gatt()
sg_reader = SGReader(pump)
logging.debug("sg reader created")
# try to read the SG every minute
if time.monotonic() - last_read > 60 and sg_reader is not None:
last_read += 60
try:
sg = sg_reader.get_value(sh)
logging.info(f"read sg = {sg} ({sg_reader.mgdl_to_mmolL(sg)} mmol/L)")
except Exception as e:
logging.error(f"failed to read sg: {e}")
# TODO: put some ipython here for testing or something
def main():
global ph, pa, sh, device
# parse CLI args
parser = argparse.ArgumentParser(description="Python Pump Connector")
parser.add_argument('-p', '--advertise_paired',
help='Mobile name to use if this device has already been paired with a pump. In a format of 6 number digits.',
default=None)
parser.add_argument('-a', '--adapter-address',
help='MAC address of the Bluetooth adapter to use')
args = parser.parse_args()
# check if bt is even on
if not is_bluetooth_active():
raise Exception("you need to have bluetooth running!")
# ask for pw
logging.warning("Enter sudo password if asked: (we need this for the low level btmgmt tool)")
exec("sudo echo")
if args.adapter_address:
adapter_addr = args.adapter_address
else:
# use first Bluetooth adapter found
adapter_addr = next(adapter.Adapter.available()).address
sh = SakeHandler()
ph = PeripheralHandler(adapter_addr)
# if user did not provide an already-paired name, start from fresh
if not args.advertise_paired:
forget_pump_devices()
mobile_name = None
paired = False
else:
mobile_name = "Mobile " + args.advertise_paired
paired = True
pa = PumpAdvertiser(mobile_name, paired)
def on_connect(dev:Device):
global device
device = dev
pa.on_connect_cb(dev)
ph.set_on_connect(on_connect)
ph.set_on_disconnect(pa.on_disconnect_cb)
# create the services
service_info_serv = BleService("00000900-0000-1000-0000-009132591325", "Device Info")
sake_serv = BleService("FE82", "Sake Service")
ph.add_service(service_info_serv)
ph.add_service(sake_serv)
# create the characteristics
mn = BleChar("2A29", "Manufacturer Name", "Google")
mn_model = BleChar("2A24", "Model Number", "Nexus 5x")
sn = BleChar("2A25", "Serial Number", "12345678")
hw_rev = BleChar("2A27", "Hardware Revision", "HW 1.0")
fw_rev = BleChar("2A26", "Firmware Revision", "FW 1.0")
sw_rev = BleChar("2A28", "Software Revision", "2.9.0 f1093d1") # actual application version with commit hash
system_id = BleChar("2A23", "System ID", bytes(8))
pnp_id = BleChar("2A50", "PNP ID", bytes(7))
cert_data = BleChar("2A2A", "Certification Data List", bytes(0))
sake_port = BleChar("0000FE82-0000-1000-0000-009132591325", "Sake Port", None, sh.notify_callback, sh.write_callback)
# add all chars
for char in [mn, mn_model, sn, hw_rev, fw_rev, sw_rev, system_id, pnp_id, cert_data]:
ph.add_char(service_info_serv, char)
ph.add_char(sake_serv, sake_port)
# finally before calling bluezero, start our advertisement and main logic thread
pa.start_adv()
logic_thread = threading.Thread(
target=main_logic,
name="logic_thread",
daemon=True,
)
logic_thread.start()
ph.publish()
return
if __name__ == "__main__":
main()