-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnotebook_data_collection.py
More file actions
324 lines (266 loc) · 8.82 KB
/
Copy pathnotebook_data_collection.py
File metadata and controls
324 lines (266 loc) · 8.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""
Notebook-friendly helpers for simple camera data collection workflows.
Use this file from Jupyter by either:
1. Copying the functions into a notebook cell, or
2. Importing it after adding the repo root to `sys.path`.
Typical workflow:
from alpha_sdk_client import AlphaSDKClient
from notebook_data_collection import (
connect_first_camera,
collect_property_rows,
fetch_live_view_frame,
save_live_view_frame,
)
client = AlphaSDKClient(base_url="http://localhost:8080")
camera = connect_first_camera(client, mode="remote")
rows = collect_property_rows(client, camera.id, count=10, interval_s=1.0)
frame = fetch_live_view_frame(client, camera.id)
save_live_view_frame(frame, "frame.jpg")
This example intentionally stays minimal:
- no pandas dependency
- no plotting dependency
- no server subprocess management
Start the camera server separately, then point the client at it.
"""
from __future__ import annotations
import argparse
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Sequence
import httpx
from alpha_sdk_client import AlphaSDKClient
from alpha_sdk_client.types.property_name import PropertyName
DEFAULT_PROPERTIES: tuple[PropertyName, ...] = (
"battery-remain",
"iso",
"aperture",
"shutter-speed",
"white-balance",
"focus-mode",
"focus-distance",
"zoom-distance",
"media-slot1-remaining-photos",
"media-slot2-remaining-photos",
)
@dataclass(frozen=True)
class ConnectedCamera:
id: str
model: str
mode: str
def list_cameras(client: AlphaSDKClient) -> list[dict[str, str]]:
"""
Return a compact list of discovered cameras for notebook display.
"""
response = client.cameras.list()
return [
{
"id": camera.id,
"model": camera.model,
"connection_type": str(camera.connection_type),
"connected": str(camera.connected),
}
for camera in response.cameras
]
def connect_first_camera(
client: AlphaSDKClient,
*,
mode: str = "remote",
timeout_s: float = 15.0,
poll_interval_s: float = 0.5,
) -> ConnectedCamera:
"""
Discover the first camera and connect to it.
Use `mode="remote-transfer"` if you also need SD card listing/downloads.
"""
cameras = client.cameras.list().cameras
if not cameras:
raise RuntimeError("No cameras discovered. Check the camera USB/network setup and server status.")
camera = cameras[0]
client.cameras.connect(camera.id, mode=mode)
deadline = time.time() + timeout_s
status = None
while time.time() < deadline:
status = client.cameras.get_connection_status(camera.id)
if status.success and status.data is not None:
break
time.sleep(poll_interval_s)
if status is None or not status.success or status.data is None:
message = status.message if status is not None else "Timed out waiting for connection status"
raise RuntimeError(f"Camera did not reach connected state: {message}")
return ConnectedCamera(
id=camera.id,
model=camera.model,
mode=str(status.data.mode or mode),
)
def get_property_snapshot(
client: AlphaSDKClient,
camera_id: str,
*,
properties: Sequence[PropertyName] = DEFAULT_PROPERTIES,
) -> dict[str, object]:
"""
Return a single flat row of selected properties.
Values are the formatted strings that are usually most useful in notebooks.
Raw SDK values and hex values are also included for downstream analysis.
"""
all_properties = client.properties.get_all(camera_id).data.properties
row: dict[str, object] = {
"timestamp": time.time(),
"camera_id": camera_id,
}
for property_name in properties:
prop = all_properties.get(property_name)
if prop is None:
row[property_name] = None
row[f"{property_name}_raw"] = None
row[f"{property_name}_hex"] = None
continue
row[property_name] = prop.current_formatted
row[f"{property_name}_raw"] = prop.current_value
row[f"{property_name}_hex"] = prop.current_hex_value
return row
def collect_property_rows(
client: AlphaSDKClient,
camera_id: str,
*,
count: int = 10,
interval_s: float = 1.0,
properties: Sequence[PropertyName] = DEFAULT_PROPERTIES,
) -> list[dict[str, object]]:
"""
Collect repeated property snapshots into a list of rows.
This shape is ready for `pandas.DataFrame(rows)` if the user wants it,
but pandas is not required.
"""
rows: list[dict[str, object]] = []
for index in range(count):
rows.append(get_property_snapshot(client, camera_id, properties=properties))
if index < count - 1:
time.sleep(interval_s)
return rows
def trigger_af_capture(client: AlphaSDKClient, camera_id: str) -> None:
"""
Run the camera's AF + shutter action once.
"""
client.actions.af_shutter(camera_id)
def enable_live_view(client: AlphaSDKClient, camera_id: str) -> None:
"""
Ensure live view is enabled and streaming before frame fetches.
"""
client.live_view.enable(camera_id)
client.live_view.start(camera_id)
def fetch_live_view_frame(
client: AlphaSDKClient,
camera_id: str,
*,
timeout_s: float = 10.0,
) -> bytes:
"""
Fetch the latest JPEG live-view frame.
Uses the raw frame endpoint directly because it is binary data and easiest
to consume that way in notebooks.
"""
base_url = client._client_wrapper.get_base_url() # type: ignore[attr-defined]
url = f"{base_url}/api/cameras/{camera_id}/live-view/frame"
with httpx.Client(timeout=timeout_s) as http_client:
response = http_client.get(url)
response.raise_for_status()
frame = response.content
if len(frame) < 3 or frame[:3] != b"\xff\xd8\xff":
raise RuntimeError("Live-view response was not a valid JPEG frame.")
return frame
def save_live_view_frame(frame: bytes, output_path: str | Path) -> Path:
"""
Save a JPEG frame to disk.
"""
path = Path(output_path).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(frame)
return path
def list_sd_files(
client: AlphaSDKClient,
camera_id: str,
*,
slot_number: int = 1,
limit: int = 10,
) -> list[dict[str, object]]:
"""
Return a compact preview of SD card files.
Requires the camera to be connected in `remote-transfer` or `contents` mode.
"""
response = client.sd_card.list(camera_id, slot_number).files
files: list[dict[str, object]] = []
for entry in response[:limit]:
files.append(
{
"name": entry.file_path,
"content_id": entry.content_id,
"file_id": entry.file_id,
"size_bytes": entry.file_size,
"slot": slot_number,
}
)
return files
def print_summary(rows: Iterable[dict[str, object]]) -> None:
"""
Lightweight display helper for plain Python sessions.
"""
for row in rows:
print(row)
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Minimal notebook/data-collection camera helper.")
parser.add_argument("--base-url", default="http://localhost:8080", help="Camera server base URL.")
parser.add_argument(
"--mode",
default="remote",
choices=("remote", "remote-transfer", "contents"),
help="Connection mode used when auto-connecting the first camera.",
)
parser.add_argument(
"--count",
type=int,
default=3,
help="Number of property snapshots to collect for the smoke test.",
)
parser.add_argument(
"--interval",
type=float,
default=1.0,
help="Seconds between property snapshots for the smoke test.",
)
parser.add_argument(
"--capture",
action="store_true",
help="Trigger one AF + shutter capture after connecting.",
)
return parser
def main() -> None:
args = _build_parser().parse_args()
client = AlphaSDKClient(base_url=args.base_url)
cameras = list_cameras(client)
print("Discovered cameras:")
print_summary(cameras)
if not cameras:
return
camera = connect_first_camera(client, mode=args.mode)
print(
{
"connected_camera_id": camera.id,
"model": camera.model,
"mode": camera.mode,
}
)
if args.capture:
trigger_af_capture(client, camera.id)
print({"capture": "triggered"})
rows = collect_property_rows(
client,
camera.id,
count=args.count,
interval_s=args.interval,
)
print("Collected property rows:")
print_summary(rows)
if __name__ == "__main__":
main()