Skip to content

Commit 8b8b8c4

Browse files
authored
Feature: Tap API in simulated broadband source (#141)
* tap interface in synapse-sim * tap_example * migrate to BroadbandFrame from ndtp_types.ElectricalBroadbandData * pass iface_ip to broadband, select random port * set zmq resources to None * resolve review comments
1 parent 76fa430 commit 8b8b8c4

File tree

4 files changed

+185
-13
lines changed

4 files changed

+185
-13
lines changed

synapse/examples/tap_example.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import synapse as syn
2+
import sys
3+
import time
4+
5+
from synapse.client.taps import Tap
6+
7+
SIMULATED_PERIPHERAL_ID = 100
8+
9+
if __name__ == "__main__":
10+
uri = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1:647"
11+
device = syn.Device(uri)
12+
info = device.info()
13+
if info is None:
14+
print("Couldn't get device info")
15+
sys.exit(1)
16+
17+
print("Device info:")
18+
print(info)
19+
20+
channels = [
21+
syn.Channel(
22+
id=channel_num,
23+
electrode_id=channel_num * 2,
24+
reference_id=channel_num * 2 + 1,
25+
)
26+
for channel_num in range(32)
27+
]
28+
29+
broadband = syn.BroadbandSource(
30+
# Use the simulated peripheral (100), or replace with your own
31+
peripheral_id=SIMULATED_PERIPHERAL_ID,
32+
sample_rate_hz=30000,
33+
bit_width=12,
34+
gain=20.0,
35+
signal=syn.SignalConfig(
36+
electrode=syn.ElectrodeConfig(
37+
channels=channels,
38+
low_cutoff_hz=500.0,
39+
high_cutoff_hz=6000.0,
40+
)
41+
),
42+
)
43+
44+
config = syn.Config()
45+
config.add_node(broadband)
46+
47+
device.configure(config)
48+
49+
# export the config to a json file for using with CLI
50+
# from google.protobuf.json_format import MessageToJson
51+
# with open("device_config.json", "w") as f:
52+
# f.write(MessageToJson(config.to_proto()))
53+
# print("Config written to device_config.json")
54+
55+
device.start()
56+
57+
info = device.info()
58+
if info is None:
59+
print("Couldn't get device info")
60+
sys.exit(1)
61+
print("Configured device info:")
62+
print(info)
63+
64+
# stream with tap api
65+
tap_client = Tap(uri)
66+
tap_client.connect("broadband_source_sim")
67+
68+
should_run = True
69+
total_bytes_read = 0
70+
start_time = time.time()
71+
last_update_time = start_time
72+
update_interval_sec = 1
73+
while should_run:
74+
try:
75+
# Wait for data
76+
syn_data = tap_client.read()
77+
bytes_read = len(syn_data)
78+
if syn_data is None or bytes_read == 0:
79+
print("Failed to read data from node")
80+
continue
81+
# Do something with the data
82+
total_bytes_read += bytes_read
83+
84+
current_time = time.time()
85+
if (current_time - last_update_time) >= update_interval_sec:
86+
sys.stdout.write("\r")
87+
sys.stdout.write(
88+
f"{total_bytes_read} bytes in {time.time() - start_time:.2f} sec"
89+
)
90+
last_update_time = current_time
91+
92+
if current_time - start_time > 5:
93+
should_run = False
94+
95+
except KeyboardInterrupt:
96+
print("Keyboard interrupt detected, stopping")
97+
should_run = False
98+
99+
print("Stopping device")
100+
device.stop()
101+

synapse/server/nodes/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,6 @@ def node_socket(self):
7171
bind=f"{self.socket[0]}:{self.socket[1]}",
7272
type=self.type,
7373
)
74+
75+
def tap_connections(self):
76+
return []

synapse/server/rpc.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from synapse.api.node_pb2 import NodeConnection, NodeType
1010
from synapse.api.logging_pb2 import LogLevel, LogQueryResponse
1111
from synapse.api.query_pb2 import QueryResponse
12+
from synapse.api.tap_pb2 import ListTapsResponse
1213
from synapse.api.status_pb2 import DeviceState, Status, StatusCode
1314
from synapse.api.device_pb2 import DeviceConfiguration, DeviceInfo
1415
from synapse.api.synapse_pb2_grpc import (
@@ -69,6 +70,7 @@ class SynapseServicer(SynapseDeviceServicer):
6970
def __init__(self, name, serial, iface_ip, node_object_map, peripherals):
7071
self.name = name
7172
self.serial = serial
73+
self.iface_ip = iface_ip
7274
self.node_object_map = node_object_map
7375
self.peripherals = peripherals
7476
self.logger = logging.getLogger("server")
@@ -168,6 +170,10 @@ async def Query(self, request, context):
168170

169171
# handle query
170172

173+
taps = []
174+
for node in self.nodes:
175+
taps.extend(node.tap_connections())
176+
171177
return QueryResponse(
172178
data=[1, 2, 3, 4, 5],
173179
status=Status(
@@ -176,6 +182,7 @@ async def Query(self, request, context):
176182
sockets=self._sockets_status_info(),
177183
state=self.state,
178184
),
185+
list_taps_response=ListTapsResponse(taps=taps),
179186
)
180187

181188
async def GetLogs(self, request, context):
@@ -320,7 +327,7 @@ def _reconfigure(self, configuration):
320327
"Creating %s node(%d)" % (NodeType.Name(node.type), node.id)
321328
)
322329
node = self.node_object_map[node.type](node.id)
323-
if node.type in [NodeType.kStreamIn]:
330+
if node.type in [NodeType.kStreamIn, NodeType.kBroadbandSource]:
324331
node.configure_iface_ip(self.iface_ip)
325332

326333
status = node.configure(config)

synapse/simulator/nodes/broadband_source.py

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22
import random
33
import time
44

5+
import zmq
6+
57
from synapse.api.node_pb2 import NodeType
68
from synapse.api.nodes.broadband_source_pb2 import BroadbandSourceConfig
79
from synapse.server.nodes.base import BaseNode
10+
from synapse.api.tap_pb2 import TapConnection, TapType
11+
from synapse.api.datatype_pb2 import BroadbandFrame
812
from synapse.server.status import Status
913
from synapse.utils.ndtp_types import ElectricalBroadbandData
1014

@@ -16,6 +20,11 @@ class BroadbandSource(BaseNode):
1620
def __init__(self, id):
1721
super().__init__(id, NodeType.kBroadbandSource)
1822
self.__config: BroadbandSourceConfig = None
23+
self.zmq_context = None
24+
self.zmq_socket = None
25+
self.seq_number = 0
26+
self.iface_ip = None
27+
self.port = None
1928

2029
def config(self):
2130
c = super().config()
@@ -39,37 +48,89 @@ async def run(self):
3948
if not c.HasField("signal") or not c.signal:
4049
self.logger.error("node signal not configured")
4150
return
42-
51+
4352
if not c.signal.HasField("electrode") or not c.signal.electrode:
4453
self.logger.error("node signal electrode not configured")
4554
return
46-
55+
4756
e = c.signal.electrode
4857
if not e.channels:
4958
self.logger.error("node signal electrode channels not configured")
5059
return
5160

61+
if not self.zmq_context:
62+
if not self.iface_ip:
63+
self.logger.error("iface_ip not configured")
64+
return
65+
66+
self.zmq_context = zmq.Context()
67+
self.zmq_socket = self.zmq_context.socket(zmq.PUB)
68+
self.port = self.zmq_socket.bind_to_random_port(f"tcp://{self.iface_ip}")
69+
5270
channels = e.channels
5371
bit_width = c.bit_width if c.bit_width else 4
5472
sample_rate_hz = c.sample_rate_hz if c.sample_rate_hz else 16000
5573

5674
t_last_ns = time.time_ns()
5775
while self.running:
5876
await asyncio.sleep(0.01)
59-
77+
6078
now = time.time_ns()
6179
elapsed_ns = now - t_last_ns
6280
n_samples = int(sample_rate_hz * elapsed_ns / 1e9)
63-
6481
samples = [[ch.id, [r_sample(bit_width) for _ in range(n_samples)]] for ch in channels]
6582

66-
data = ElectricalBroadbandData(
67-
bit_width=bit_width,
68-
is_signed=False,
69-
sample_rate=sample_rate_hz,
70-
t0=t_last_ns,
71-
samples=samples
83+
try:
84+
# for backwards compatibility
85+
data = ElectricalBroadbandData(
86+
bit_width=bit_width,
87+
is_signed=False,
88+
sample_rate=sample_rate_hz,
89+
t0=t_last_ns,
90+
samples=samples
91+
)
92+
await self.emit_data(data)
93+
94+
# send data over tap
95+
for i in range(n_samples):
96+
frame = BroadbandFrame(
97+
timestamp_ns = t_last_ns + int(i * 1e9 / sample_rate_hz),
98+
sequence_number = self.seq_number,
99+
frame_data = [chan_samples[i] for _, chan_samples in samples],
100+
sample_rate_hz = sample_rate_hz,
101+
)
102+
try:
103+
self.zmq_socket.send(frame.SerializeToString())
104+
self.seq_number += 1
105+
except Exception as e:
106+
self.logger.error(f"Error sending data: {e}")
107+
108+
t_last_ns = now
109+
except Exception as e:
110+
print(f"Error sending data: {e}")
111+
112+
def stop(self):
113+
"""Clean up ZMQ resources."""
114+
if self.zmq_socket:
115+
self.zmq_socket.close()
116+
self.zmq_socket = None
117+
118+
if self.zmq_context:
119+
self.zmq_context.destroy()
120+
self.zmq_context = None
121+
122+
return super().stop()
123+
124+
def configure_iface_ip(self, iface_ip):
125+
self.iface_ip = iface_ip
126+
127+
def tap_connections(self):
128+
return [
129+
TapConnection(
130+
name="broadband_source_sim",
131+
endpoint=f"tcp://{self.iface_ip}:{self.port}",
132+
message_type="synapse.BroadbandFrame",
133+
tap_type=TapType.TAP_TYPE_PRODUCER,
72134
)
135+
]
73136

74-
await self.emit_data(data)
75-
t_last_ns = now

0 commit comments

Comments
 (0)