diff --git a/.gitignore b/.gitignore index 8947e99a..7e931939 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,4 @@ astro_data/comets.txt # users GPS data test_ubx test_ubx/* +python/telemetry_analysis/ diff --git a/default_config.json b/default_config.json index 6b188339..d80a4139 100644 --- a/default_config.json +++ b/default_config.json @@ -177,5 +177,7 @@ "active_telescope_index": 0, "active_eyepiece_index": 0 }, - "imu_threshold_scale": 1 + "imu_threshold_scale": 1, + "telemetry_record": false, + "telemetry_images": false } diff --git a/python/PiFinder/camera_interface.py b/python/PiFinder/camera_interface.py index 9d469564..ffeb45e1 100644 --- a/python/PiFinder/camera_interface.py +++ b/python/PiFinder/camera_interface.py @@ -423,7 +423,17 @@ def get_image_loop( f"Exposure saved and auto-exposure disabled: {self.exposure_time}µs" ) - if command.startswith("save"): + if command.startswith("save_image:"): + # Save current camera frame to specified path + save_path = command.split(":", 1)[1] + try: + img = camera_image.copy() + img.save(save_path, "PNG", compress_level=6) + logger.debug("Telemetry image saved: %s", save_path) + except Exception as e: + logger.error("Failed to save telemetry image: %s", e) + + if command.startswith("save:"): # Set flag to save next capture to this file self._save_next_to = command.split(":")[1] console_queue.put("CAM: Save flag set") diff --git a/python/PiFinder/imu_pi.py b/python/PiFinder/imu_pi.py index e62bd6dc..1554d69b 100644 --- a/python/PiFinder/imu_pi.py +++ b/python/PiFinder/imu_pi.py @@ -175,6 +175,13 @@ def imu_monitor(shared_state, console_queue, log_queue): imu.update() imu_sample.status = imu.calibration + # Read raw sensor data for telemetry + try: + imu_sample.gyro = imu.sensor.gyro + imu_sample.accel = imu.sensor.linear_acceleration + except Exception: + pass + if imu.moving(): if not imu_sample.moving: logger.debug("IMU: move start") diff --git a/python/PiFinder/integrator.py b/python/PiFinder/integrator.py index 19ae627c..cd552ff6 100644 --- a/python/PiFinder/integrator.py +++ b/python/PiFinder/integrator.py @@ -26,6 +26,11 @@ prediction during dead-reckoning. The IDR remains a math primitive (``RaDecRoll`` in, ``RaDecRoll`` out); this module bridges between it and :class:`PointingEstimate`. + +Telemetry record/replay is handled by :class:`TelemetryManager` +(``telemetry.py``). Replayed sessions are converted back into +:class:`SolveResult` / :class:`ImuSample` messages and fed through the +same ``_apply_*`` / ``_advance_with_imu`` paths as live data. """ from __future__ import annotations @@ -45,6 +50,7 @@ from PiFinder.multiproclogging import MultiprocLogging from PiFinder.pointing_model.imu_dead_reckoning import ImuDeadReckoning import PiFinder.pointing_model.quaternion_transforms as qt +from PiFinder.telemetry import TelemetryManager from PiFinder.types.positioning import ( FailedSolve, ImuSample, @@ -62,12 +68,21 @@ IMU_MOVED_ANG_THRESHOLD = np.deg2rad(0.06) -def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=False): +def integrator( + shared_state, + solver_queue, + console_queue, + log_queue, + is_debug=False, + command_queue=None, + camera_command_queue=None, +): MultiprocLogging.configurer(log_queue) if is_debug: logger.setLevel(logging.DEBUG) logger.debug("Starting Integrator") + telemetry = None try: cfg = config.Config() screen_direction = cfg.get_option("screen_direction") @@ -82,22 +97,59 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa # Epoch of the last estimate we published; gate re-publishing on it. last_published_time = time.time() + was_replaying = False + telemetry = TelemetryManager( + cfg, shared_state, console_queue, camera_command_queue + ) + while True: state_utils.sleep_for_framerate(shared_state) + telemetry.poll_commands(command_queue) + pointing_updated = False - # 1. Pull any pending solve result from the queue. + # 1. Pull the next message — from the replay stream when + # replaying, otherwise from the solver queue. solve_result: Optional[SolveResult] = None - try: - solve_result = solver_queue.get(block=False) - except queue.Empty: - pass + replay_imu: Optional[ImuSample] = None + + if telemetry.replaying: + if not was_replaying: + was_replaying = True + # Recorded epochs are in the past; rewind the publish + # gate so replayed estimates pass the newer-than check. + last_published_time = 0.0 + # The solver keeps running during replay; discard its output. + _drain_queue(solver_queue) + message = telemetry.next_replay_message() + if isinstance(message, (SuccessfulSolve, FailedSolve)): + solve_result = message + elif isinstance(message, ImuSample): + replay_imu = message + else: + if was_replaying: + # Replay ended — reset to a clean unanchored state. + was_replaying = False + estimate = PointingEstimate() + idr.reset() + last_published_time = time.time() + logger.info("Replay ended, integrator state reset") + try: + solve_result = solver_queue.get(block=False) + except queue.Empty: + pass if isinstance(solve_result, SuccessfulSolve): + telemetry.record_solve( + solve_result, predicted=estimate.pointing.aligned.estimate + ) estimate = _apply_successful_solve(estimate, solve_result, idr) pointing_updated = True elif isinstance(solve_result, FailedSolve): + telemetry.record_solve( + solve_result, predicted=estimate.pointing.aligned.estimate + ) estimate = _apply_failed_solve(estimate, solve_result) # Publish unconditionally so auto-exposure sees the failed # attempt (Matches=0, fresh last_solve_attempt). The estimate @@ -107,14 +159,16 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa shared_state.set_solution(copy.deepcopy(estimate)) # 2. If we have an anchor and didn't just do a fresh plate-solve, - # try to advance the estimate via IMU dead-reckoning. + # try to advance the estimate via IMU dead-reckoning. The IMU + # sample comes from the replay stream when replaying. if ( not pointing_updated and idr.is_initialized() and estimate.imu_anchor is not None ): - imu = shared_state.imu() + imu = replay_imu if telemetry.replaying else shared_state.imu() if imu: + telemetry.record_imu(imu) if _advance_with_imu(estimate, idr, imu): pointing_updated = True @@ -137,8 +191,13 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa shared_state.set_solution(copy.deepcopy(estimate)) last_published_time = estimate.estimate_time + telemetry.flush() + except EOFError: logger.error("Main no longer running for integrator") + finally: + if telemetry is not None: + telemetry.stop() def _apply_successful_solve( @@ -263,3 +322,12 @@ def _get_alt_az(ra_deg, dec_deg, location, dt) -> tuple[float | None, float | No return None, None calc_utils.sf_utils.set_location(location.lat, location.lon, location.altitude) return calc_utils.sf_utils.radec_to_altaz(ra_deg, dec_deg, dt) + + +def _drain_queue(q): + """Discard all pending items from a queue.""" + try: + while True: + q.get(block=False) + except queue.Empty: + pass diff --git a/python/PiFinder/main.py b/python/PiFinder/main.py index 2eff6be9..271d3795 100644 --- a/python/PiFinder/main.py +++ b/python/PiFinder/main.py @@ -124,6 +124,7 @@ def setup_dirs(): utils.create_path(Path(utils.data_dir, "screenshots")) utils.create_path(Path(utils.data_dir, "solver_debug_dumps")) utils.create_path(Path(utils.data_dir, "logs")) + utils.create_path(Path(utils.data_dir, "telemetry")) os.chmod(Path(utils.data_dir), 0o777) @@ -384,6 +385,8 @@ def main( logger.info("PiFinder running on %s, %s, %s", os_detail, platform, arch) # init UI Modes + integrator_command_queue: Queue = Queue() + command_queues = { "camera": camera_command_queue, "console": console_queue, @@ -391,6 +394,7 @@ def main( "align_command": alignment_command_queue, "align_response": alignment_response_queue, "gps": gps_queue, + "integrator": integrator_command_queue, } cfg = config.Config() @@ -545,6 +549,10 @@ def main( integrator_logqueque, verbose, ), + kwargs={ + "command_queue": integrator_command_queue, + "camera_command_queue": camera_command_queue, + }, ) integrator_process.start() diff --git a/python/PiFinder/telemetry.py b/python/PiFinder/telemetry.py new file mode 100644 index 00000000..aae857b4 --- /dev/null +++ b/python/PiFinder/telemetry.py @@ -0,0 +1,613 @@ +""" +Telemetry recording and replay for the integrator. + +Records IMU samples and plate solves with accurate timing to JSONL files +in ~/PiFinder_data/telemetry/. Replay mode converts recorded events back +into :class:`SolveResult` / :class:`ImuSample` messages that the +integrator feeds through its normal apply/advance paths for bench +testing. +""" + +import json +import logging +import queue +import threading +import time +from collections import deque +from datetime import datetime +from pathlib import Path + +import quaternion as quaternion_module + +from PiFinder import calc_utils +from PiFinder import utils +from PiFinder.types.positioning import ( + FailedSolve, + ImuSample, + Pointing, + SolveDiagnostics, + SuccessfulSolve, +) + +logger = logging.getLogger("Telemetry") + +TELEMETRY_DIR = Path(utils.data_dir) / "telemetry" + + +_R = 5 # decimal places for float rounding (BNO055 is ~14-bit) + +# Stationary IMU downsampling: record every Nth sample when not moving +_STATIONARY_DECIMATION = 10 + + +def _rf(v): + """Round a float for compact serialization.""" + return round(v, _R) + + +def _serialize_quat(q): + """Serialize a quaternion to a list [w, x, y, z].""" + if q is None: + return None + try: + return [_rf(q.w), _rf(q.x), _rf(q.y), _rf(q.z)] + except (AttributeError, TypeError): + return None + + +def _serialize_vec(v): + """Serialize a 3-tuple/list to rounded list, or None.""" + if v is None: + return None + try: + return [_rf(v[0]), _rf(v[1]), _rf(v[2])] + except (TypeError, IndexError): + return None + + +class TelemetryRecorder: + """ + Records IMU and solve events to a JSONL file. + + Uses a deque buffer flushed every 5 seconds by a background thread. + When disabled, all methods are no-ops. + """ + + def __init__(self): + self.enabled = False + self.images_enabled = False + self._buffer = deque(maxlen=300) + self._file = None + self._flush_thread = None + self._stop_event = threading.Event() + self._session_dir = None + self._last_flush = 0.0 + self._imu_skip_count = 0 + self._last_target_id = None + + def start(self, cfg, shared_state): + """Start a new recording session.""" + if self.enabled: + self.stop() + + TELEMETRY_DIR.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self._session_dir = TELEMETRY_DIR / timestamp + self._session_dir.mkdir(parents=True, exist_ok=True) + session_file = self._session_dir / "session.jsonl" + + self._file = open(session_file, "a") + self.enabled = True + self._last_flush = time.time() + + # Write header (no location — written to separate .location file) + dt = shared_state.datetime() + header = { + "t": time.time(), + "e": "hdr", + "dt": dt.isoformat() if dt else None, + "cfg": { + "screen_direction": cfg.get_option("screen_direction"), + "mount_type": cfg.get_option("mount_type"), + }, + } + self._buffer.append(json.dumps(header) + "\n") + + # Write location to a separate file to avoid leaking it in shared recordings + location = shared_state.location() + if location: + loc_file = self._session_dir / "session.location" + loc_data = { + "lat": location.lat, + "lon": location.lon, + "altitude": location.altitude, + } + loc_file.write_text(json.dumps(loc_data)) + + # Start flush thread + self._stop_event.clear() + self._flush_thread = threading.Thread( + target=self._flush_loop, daemon=True, name="telemetry-flush" + ) + self._flush_thread.start() + logger.info("Telemetry recording started: %s", session_file) + + def stop(self): + """Stop the current recording session.""" + if not self.enabled: + return + self.enabled = False + self._stop_event.set() + if self._flush_thread: + self._flush_thread.join(timeout=2) + self._flush_thread = None + self._do_flush() + if self._file: + self._file.close() + self._file = None + self._session_dir = None + logger.info("Telemetry recording stopped") + + def record_imu(self, imu): + """Record an :class:`ImuSample`. No-op if disabled. + + When stationary, only records every _STATIONARY_DECIMATION-th sample + to reduce file size during long sessions. + """ + if not self.enabled or imu is None: + return + moving = imu.moving + if not moving: + self._imu_skip_count += 1 + if self._imu_skip_count < _STATIONARY_DECIMATION: + return + self._imu_skip_count = 0 + else: + self._imu_skip_count = 0 + record = { + "t": _rf(time.time()), + "e": "imu", + "q": _serialize_quat(imu.quat), + "mv": moving, + "st": imu.status, + "gyro": _serialize_vec(imu.gyro), + "accel": _serialize_vec(imu.accel), + } + self._buffer.append(json.dumps(record) + "\n") + + def record_solve(self, solve_result, predicted=None): + """Record a :class:`SolveResult`. No-op if disabled. + + ``predicted`` is the integrator's current aligned-axis estimate + (the IMU-progressed :class:`Pointing`) just before the solve was + applied, enabling drift measurement. + + Returns the timestamp used for the record, or None if not recorded. + """ + if not self.enabled or solve_result is None: + return None + t = time.time() + success = isinstance(solve_result, SuccessfulSolve) + record = { + "t": _rf(t), + "e": "solve", + "ra": _rf(solve_result.aligned.RA) if success else None, + "dec": _rf(solve_result.aligned.Dec) if success else None, + "roll": _rf(solve_result.aligned.Roll) if success else None, + "pred_ra": _rf(predicted.RA) if predicted is not None else None, + "pred_dec": _rf(predicted.Dec) if predicted is not None else None, + "cam_ra": _rf(solve_result.camera.RA) if success else None, + "cam_dec": _rf(solve_result.camera.Dec) if success else None, + "cam_roll": _rf(solve_result.camera.Roll) if success else None, + "iq": _serialize_quat(solve_result.imu_anchor) if success else None, + "matches": solve_result.diagnostics.Matches, + "rmse": _rf(solve_result.diagnostics.RMSE) + if solve_result.diagnostics.RMSE is not None + else None, + "lsa": solve_result.last_solve_attempt, + "lss": solve_result.last_solve_success, + "src": "CAM" if success else "CAM_FAILED", + } + self._buffer.append(json.dumps(record) + "\n") + return t + + def record_target(self, target, alt=None, az=None): + """Record a target change event. Pass None when target is cleared.""" + if not self.enabled: + return + if target is None: + target_id = None + else: + target_id = getattr(target, "object_id", None) + + if target_id == self._last_target_id: + return + self._last_target_id = target_id + + if target is None: + record = { + "t": _rf(time.time()), + "e": "tgt", + "name": None, + "ra": None, + "dec": None, + "alt": None, + "az": None, + } + else: + record = { + "t": _rf(time.time()), + "e": "tgt", + "name": getattr(target, "display_name", None), + "ra": _rf(target.ra) if target.ra is not None else None, + "dec": _rf(target.dec) if target.dec is not None else None, + "alt": _rf(alt) if alt is not None else None, + "az": _rf(az) if az is not None else None, + } + self._buffer.append(json.dumps(record) + "\n") + + def get_session_dir(self): + """Return current session directory path, or None.""" + return self._session_dir + + def flush(self): + """Time-gated flush - only actually flushes every 5 seconds.""" + if not self.enabled: + return + now = time.time() + if now - self._last_flush >= 5.0: + self._do_flush() + self._last_flush = now + + def _do_flush(self): + """Flush the buffer to disk.""" + if not self._file or not self._buffer: + return + lines = [] + while self._buffer: + try: + lines.append(self._buffer.popleft()) + except IndexError: + break + if lines: + self._file.writelines(lines) + self._file.flush() + + def _flush_loop(self): + """Background thread that flushes every 5 seconds.""" + while not self._stop_event.is_set(): + self._stop_event.wait(5.0) + self._do_flush() + + +class TelemetryPlayer: + """ + Reads a recorded JSONL session and replays events with original timing. + """ + + def __init__(self, path): + self.path = Path(path) + self.events = [] + self.header = None + self._index = 0 + self._base_time = None + self._replay_start = None + self._load() + + def _load(self): + """Load all events from the JSONL file.""" + file_path = self.path + if file_path.is_dir(): + file_path = file_path / "session.jsonl" + + with open(file_path) as f: + for line in f: + line = line.strip() + if not line: + continue + event = json.loads(line) + if event.get("e") == "hdr": + self.header = event + else: + self.events.append(event) + + if self.events: + self._base_time = self.events[0]["t"] + logger.info("Loaded telemetry: %d events from %s", len(self.events), file_path) + + def reset(self): + """Reset replay to the beginning.""" + self._index = 0 + self._replay_start = None + + def get_next_event(self): + """ + Return the next event if its relative timestamp has elapsed, + otherwise return None. Call this in a loop. + + Returns (event_dict, done_bool). + """ + if self._index >= len(self.events): + return None, True + + if self._replay_start is None: + self._replay_start = time.time() + + event = self.events[self._index] + event_offset = event["t"] - self._base_time + elapsed = time.time() - self._replay_start + + if elapsed >= event_offset: + self._index += 1 + return event, self._index >= len(self.events) + + return None, False + + @property + def progress(self): + """Return replay progress as a fraction 0.0-1.0.""" + if not self.events: + return 1.0 + return self._index / len(self.events) + + @property + def total_events(self): + return len(self.events) + + @property + def current_index(self): + return self._index + + @staticmethod + def event_to_solve_result(event): + """Convert a recorded solve event back into a SolveResult message. + + A recorded ``ra`` of None means the original attempt failed — + rebuild it as a :class:`FailedSolve`; otherwise as a + :class:`SuccessfulSolve`. + """ + t = event["t"] + diagnostics = SolveDiagnostics( + Matches=event.get("matches") or 0, + RMSE=event.get("rmse"), + ) + + if event.get("ra") is None: + return FailedSolve( + diagnostics=diagnostics, + last_solve_attempt=event.get("lsa") or t, + last_solve_success=event.get("lss"), + ) + + aligned = Pointing( + RA=event["ra"], + Dec=event["dec"], + Roll=event.get("roll") or 0.0, + ) + if event.get("cam_ra") is not None: + camera = Pointing( + RA=event["cam_ra"], + Dec=event["cam_dec"], + Roll=event.get("cam_roll") or 0.0, + ) + else: + # Recordings without camera-axis data: fall back to aligned. + camera = aligned + + imu_anchor = None + iq = event.get("iq") + if iq: + imu_anchor = quaternion_module.quaternion(iq[0], iq[1], iq[2], iq[3]) + + return SuccessfulSolve( + camera=camera, + aligned=aligned, + imu_anchor=imu_anchor, + last_solve_attempt=event.get("lsa") or t, + last_solve_success=event.get("lss") or t, + diagnostics=diagnostics, + ) + + @staticmethod + def event_to_imu_sample(event): + """Convert a recorded IMU event into an ImuSample, or None if no quat.""" + q = event.get("q") + if not q: + return None + gyro = event.get("gyro") + accel = event.get("accel") + return ImuSample( + quat=quaternion_module.quaternion(q[0], q[1], q[2], q[3]), + timestamp=event["t"], + status=event.get("st", 0), + moving=event.get("mv", False), + gyro=tuple(gyro) if gyro else None, + accel=tuple(accel) if accel else None, + ) + + +class TelemetryManager: + """ + Facade over TelemetryRecorder and TelemetryPlayer. + + Owns all telemetry I/O: command dispatch, recording, replay state, + image saving, and console/camera queue messaging. The integrator + only needs to call a handful of one-liners and feed replayed + messages through its normal apply/advance paths. + """ + + def __init__(self, cfg, shared_state, console_queue, camera_command_queue=None): + self._cfg = cfg + self._shared_state = shared_state + self._console_queue = console_queue + self._camera_command_queue = camera_command_queue + self._recorder = TelemetryRecorder() + self._recorder.images_enabled = bool(cfg.get_option("telemetry_images")) + self._player = None + if cfg.get_option("telemetry_record"): + self._recorder.start(cfg, shared_state) + + @property + def replaying(self): + return self._player is not None + + def poll_commands(self, command_queue): + """Check for and dispatch any pending telemetry commands.""" + if command_queue is None: + return + try: + cmd = command_queue.get(block=False) + if isinstance(cmd, tuple): + self._handle_command(cmd[0], cmd[1]) + except queue.Empty: + pass + + def _handle_command(self, cmd_name, cmd_arg): + """Dispatch a telemetry command.""" + if cmd_name == "telemetry_record_on": + self._recorder.images_enabled = bool( + self._cfg.get_option("telemetry_images") + ) + self._recorder.start(self._cfg, self._shared_state) + self._console_queue.put("Telemetry: Recording") + + elif cmd_name == "telemetry_record_off": + self._recorder.stop() + self._console_queue.put("Telemetry: Stopped") + + elif cmd_name == "replay": + logger.info("Entering replay mode: %s", cmd_arg) + self._player = TelemetryPlayer(cmd_arg) + if self._player.header: + self._apply_replay_header(self._player.header, self._shared_state) + self._console_queue.put("Telemetry: Replay started") + + elif cmd_name == "replay_stop": + logger.info("Exiting replay mode") + self._player = None + self._restart_camera() + self._console_queue.put("Telemetry: Replay stopped") + + def next_replay_message(self): + """Return the next replayed message, or None. + + Converts the next due recorded event into a + :class:`SuccessfulSolve` / :class:`FailedSolve` / :class:`ImuSample` + for the integrator to feed through its normal paths. Returns None + when no event is due yet. Automatically clears replay state and + restarts the camera when the session is exhausted. + """ + if self._player is None: + return None + event, done = self._player.get_next_event() + if done and event is None: + self._player = None + self._restart_camera() + self._console_queue.put("Telemetry: Replay finished") + logger.info("Replay finished") + return None + if event is None: + return None + + event_type = event.get("e") + if event_type == "imu": + return TelemetryPlayer.event_to_imu_sample(event) + elif event_type == "solve": + return TelemetryPlayer.event_to_solve_result(event) + return None + + def record_solve(self, solve_result, predicted=None): + """Record a solve event and send save_image command if enabled.""" + if self.replaying: + return + t = self._recorder.record_solve(solve_result, predicted) + if ( + t is not None + and self._recorder.images_enabled + and self._camera_command_queue is not None + ): + session_dir = self._recorder.get_session_dir() + if session_dir: + self._camera_command_queue.put( + f"save_image:{session_dir / f'img_{t:.3f}.png'}" + ) + + def record_imu(self, imu): + if self.replaying: + return + self._recorder.record_imu(imu) + + def flush(self): + self._poll_target() + self._recorder.flush() + + def _poll_target(self): + """Check if the user's target changed and record it.""" + if not self._recorder.enabled: + return + try: + target = self._shared_state.ui_state().target() + except Exception: + return + alt, az = None, None + if target is not None and target.ra is not None: + try: + location = self._shared_state.location() + dt = self._shared_state.datetime() + if location and dt: + calc_utils.sf_utils.set_location( + location.lat, location.lon, location.altitude + ) + alt, az = calc_utils.sf_utils.radec_to_altaz( + target.ra, target.dec, dt + ) + except Exception: + pass + self._recorder.record_target(target, alt=alt, az=az) + + def stop(self): + self._recorder.stop() + + def _restart_camera(self): + if self._camera_command_queue is not None: + self._camera_command_queue.put("start") + + def _apply_replay_header(self, hdr, shared_state): + """Apply location/datetime from a replay session header.""" + loc_data = self._load_replay_location() + if loc_data: + loc = shared_state.location() + loc.lat = loc_data["lat"] + loc.lon = loc_data["lon"] + loc.altitude = loc_data["altitude"] + loc.lock = True + loc.source = "replay" + shared_state.set_location(loc) + if hdr.get("dt"): + shared_state.set_datetime(datetime.fromisoformat(hdr["dt"])) + + cfg = hdr.get("cfg", {}) + recorded_mount = cfg.get("mount_type") + current_mount = self._cfg.get_option("mount_type") + if recorded_mount and current_mount and recorded_mount != current_mount: + logger.warning( + "Replay mount_type '%s' differs from current config '%s'", + recorded_mount, + current_mount, + ) + + def _load_replay_location(self): + """Load location from the .location sidecar file, or None.""" + if self._player is None: + return None + loc_file = self._player.path + if loc_file.is_file(): + loc_file = loc_file.parent / "session.location" + else: + loc_file = loc_file / "session.location" + if loc_file.exists(): + try: + return json.loads(loc_file.read_text()) + except (json.JSONDecodeError, OSError): + return None + return None diff --git a/python/PiFinder/types/positioning.py b/python/PiFinder/types/positioning.py index c555af4b..344a33bd 100644 --- a/python/PiFinder/types/positioning.py +++ b/python/PiFinder/types/positioning.py @@ -433,12 +433,20 @@ class ImuSample: process sampled this orientation — the IMU-side input to :attr:`PointingEstimate.estimate_time`. It is the sample epoch, not the (later) moment a consumer reads the sample. + + ``gyro`` / ``accel`` are the raw sensor readings at the same sample, + recorded for telemetry. ``None`` when the sensor doesn't expose them + (e.g. the fake IMU). """ quat: quaternion.quaternion timestamp: float status: int = 0 # 3 == fully calibrated (BNO055) moving: bool = False + # Raw gyroscope angular velocity (rad/s) and linear acceleration + # (m/s², gravity removed) — captured for telemetry recording. + gyro: Optional[Tuple[float, float, float]] = None + accel: Optional[Tuple[float, float, float]] = None def is_calibrated(self) -> bool: return self.status == 3 @@ -451,6 +459,8 @@ def to_dict(self) -> dict: "timestamp": self.timestamp, "status": self.status, "moving": self.moving, + "gyro": list(self.gyro) if self.gyro is not None else None, + "accel": list(self.accel) if self.accel is not None else None, } diff --git a/python/PiFinder/ui/callbacks.py b/python/PiFinder/ui/callbacks.py index 71c5190b..5bedeec6 100644 --- a/python/PiFinder/ui/callbacks.py +++ b/python/PiFinder/ui/callbacks.py @@ -484,6 +484,20 @@ def generate_custom_object_name(ui_module: UIModule) -> str: return f"CUSTOM {max_num + 1}" +def telemetry_record_toggle(ui_module: UIModule) -> None: + """Toggle telemetry recording on/off via integrator command queue.""" + enabled = ui_module.config_object.get_option("telemetry_record") + if "integrator" in ui_module.command_queues: + if enabled: + ui_module.command_queues["integrator"].put(("telemetry_record_on", None)) + ui_module.message("Telemetry\nRecording", 2) + else: + ui_module.command_queues["integrator"].put(("telemetry_record_off", None)) + ui_module.message("Telemetry\nStopped", 2) + else: + ui_module.message("No integrator\nqueue", 2) + + def update_gpsd_baud_rate(ui_module: UIModule) -> None: """ Updates the GPSD configuration with the current baud rate setting. diff --git a/python/PiFinder/ui/menu_structure.py b/python/PiFinder/ui/menu_structure.py index 96b349db..0fc77477 100644 --- a/python/PiFinder/ui/menu_structure.py +++ b/python/PiFinder/ui/menu_structure.py @@ -15,6 +15,7 @@ from PiFinder.ui.location_list import UILocationList from PiFinder.ui.locationentry import UILocationEntry from PiFinder.ui.radec_entry import UIRADecEntry +from PiFinder.ui.telemetry_list import UITelemetryList import PiFinder.ui.callbacks as callbacks @@ -1188,6 +1189,57 @@ def _(key: str) -> Any: }, ], }, + { + "name": _("Dev Tools"), + "class": UITextMenu, + "select": "single", + "items": [ + { + "name": _("Telemetry"), + "class": UITextMenu, + "select": "single", + "items": [ + { + "name": _("Record"), + "class": UITextMenu, + "select": "single", + "config_option": "telemetry_record", + "post_callback": callbacks.telemetry_record_toggle, + "items": [ + { + "name": _("Off"), + "value": False, + }, + { + "name": _("On"), + "value": True, + }, + ], + }, + { + "name": _("Images"), + "class": UITextMenu, + "select": "single", + "config_option": "telemetry_images", + "items": [ + { + "name": _("Off"), + "value": False, + }, + { + "name": _("On"), + "value": True, + }, + ], + }, + { + "name": _("Load"), + "class": UITelemetryList, + }, + ], + }, + ], + }, ], }, { diff --git a/python/PiFinder/ui/telemetry_list.py b/python/PiFinder/ui/telemetry_list.py new file mode 100644 index 00000000..b5aefcd2 --- /dev/null +++ b/python/PiFinder/ui/telemetry_list.py @@ -0,0 +1,104 @@ +""" +UI screen for listing and loading telemetry recording sessions. + +Lists .jsonl files from ~/PiFinder_data/telemetry/ with filename and size. +Selecting a file triggers replay via the integrator command queue. +""" + +import logging + +from PiFinder.ui.text_menu import UITextMenu +from PiFinder.telemetry import TELEMETRY_DIR + +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + + def _(a) -> Any: + return a + + +logger = logging.getLogger("UI.TelemetryList") + + +class UITelemetryList(UITextMenu): + """File picker for telemetry sessions.""" + + __title__ = "Telemetry" + + def __init__(self, *args, **kwargs): + self._sessions = self._scan_sessions() + kwargs["item_definition"] = self._create_menu_definition() + super().__init__(*args, **kwargs) + + def _scan_sessions(self): + """Scan telemetry directory for session files.""" + sessions = [] + if not TELEMETRY_DIR.exists(): + return sessions + + # Look for session dirs (contain session.jsonl) and standalone .jsonl files + for entry in sorted(TELEMETRY_DIR.iterdir(), reverse=True): + jsonl_path = None + if entry.is_dir(): + candidate = entry / "session.jsonl" + if candidate.exists(): + jsonl_path = candidate + elif entry.suffix == ".jsonl": + jsonl_path = entry + + if jsonl_path: + size_kb = jsonl_path.stat().st_size / 1024 + label = entry.name + if size_kb >= 1024: + size_str = f"{size_kb / 1024:.1f}MB" + else: + size_str = f"{size_kb:.0f}KB" + sessions.append( + { + "label": label, + "size_str": size_str, + "path": str(entry), + } + ) + return sessions + + def _create_menu_definition(self): + items = [] + for s in self._sessions: + items.append( + { + "name": f"{s['label']} ({s['size_str']})", + "value": s["path"], + } + ) + if not items: + items.append({"name": "No sessions found", "value": None}) + return {"name": "Telemetry", "select": "single", "items": items} + + def key_right(self): + """Select a session to replay.""" + if not self._sessions: + self.message("No sessions", 2) + return False + + idx = self._current_item_index + items = self.item_definition["items"] + if idx >= len(items): + return False + + session_path = items[idx].get("value") + if session_path is None: + return False + + # Send replay command to integrator + if "integrator" in self.command_queues: + self.command_queues["camera"].put("stop") + self.command_queues["integrator"].put(("replay", session_path)) + self.message("Replay\nstarted", 2) + logger.info("Starting telemetry replay: %s", session_path) + else: + self.message("No integrator\nqueue", 2) + logger.warning("Integrator command queue not available") + + return True diff --git a/python/tests/test_integrator_drift.py b/python/tests/test_integrator_drift.py new file mode 100644 index 00000000..47db75e1 --- /dev/null +++ b/python/tests/test_integrator_drift.py @@ -0,0 +1,419 @@ +""" +Integration tests for IMU dead-reckoning drift through the real integrator +apply/advance functions. Replays synthetic telemetry through ImuDeadReckoning +and measures pointing error vs ground truth. + +Catches regressions in: +- Quaternion math (drift explodes) +- Solve incorporation (positions don't update) +- Dead-reckoning (IMU movements not reflected) +- RaDecRoll / quaternion transform correctness +""" + +from dataclasses import dataclass +from typing import List, Union + +import numpy as np +import pytest +import quaternion + +from PiFinder.integrator import _advance_with_imu, _apply_successful_solve +from PiFinder.pointing_model.imu_dead_reckoning import ImuDeadReckoning +from PiFinder.pointing_model.quaternion_transforms import axis_angle2quat, radec2q_eq +from PiFinder.types.positioning import ( + ImuSample, + Pointing, + PointingEstimate, + SuccessfulSolve, +) + + +# ── Synthetic telemetry generation ────────────────────────────────── + + +@dataclass +class SolveEvent: + """A plate-solve event with true RA/Dec and IMU quaternion.""" + + timestamp: float + ra_deg: float + dec_deg: float + roll_deg: float + imu_quat: quaternion.quaternion + + +@dataclass +class ImuEvent: + """An IMU-only reading between solves, with ground truth for error measurement.""" + + timestamp: float + imu_quat: quaternion.quaternion + moving: bool + true_ra_deg: float + true_dec_deg: float + + +@dataclass +class Measurement: + """Dead-reckoned vs ground-truth angular error.""" + + error_arcsec: float + timestamp: float + + +def angular_separation_deg(ra1, dec1, ra2, dec2): + """Great-circle angular separation in degrees between two (RA, Dec) pairs in degrees.""" + ra1_r, dec1_r = np.deg2rad(ra1), np.deg2rad(dec1) + ra2_r, dec2_r = np.deg2rad(ra2), np.deg2rad(dec2) + cos_sep = np.sin(dec1_r) * np.sin(dec2_r) + np.cos(dec1_r) * np.cos( + dec2_r + ) * np.cos(ra1_r - ra2_r) + cos_sep = np.clip(cos_sep, -1.0, 1.0) + return np.rad2deg(np.arccos(cos_sep)) + + +def _make_imu_quat_for_radec( + imu_dr: ImuDeadReckoning, ra_rad: float, dec_rad: float, roll_rad: float +) -> quaternion.quaternion: + """ + Compute the IMU quaternion q_x2imu consistent with a given (RA, Dec, Roll), + assuming q_eq2x is identity. + + From: q_eq2cam = q_eq2x * q_x2imu * q_imu2cam + With q_eq2x = I: q_x2imu = q_eq2cam * q_imu2cam.conj() + """ + q_eq2cam = radec2q_eq(ra_rad, dec_rad, roll_rad) + q_x2imu = q_eq2cam * imu_dr.q_imu2cam.conj() + return q_x2imu.normalized() + + +def generate_stationary_session( + seed: int = 42, + duration_s: float = 10.0, + solve_interval_s: float = 2.0, + imu_rate_hz: float = 10.0, + noise_arcsec: float = 1.0, +) -> list: + """ + Generate a stationary session: scope doesn't move, IMU has small noise. + Returns list of SolveEvent and ImuEvent in chronological order. + """ + rng = np.random.default_rng(seed) + ra_deg, dec_deg, roll_deg = 180.0, 45.0, 0.0 + ra_rad, dec_rad = np.deg2rad(ra_deg), np.deg2rad(dec_deg) + roll_rad = np.deg2rad(0.0) + + tmp_dr = ImuDeadReckoning("flat") + base_quat = _make_imu_quat_for_radec(tmp_dr, ra_rad, dec_rad, roll_rad) + + events: List[Union[SolveEvent, ImuEvent]] = [] + t = 0.0 + imu_dt = 1.0 / imu_rate_hz + next_solve = 0.0 + + while t <= duration_s: + noise_rad = np.deg2rad(noise_arcsec / 3600.0) + axis = rng.normal(size=3) + axis /= np.linalg.norm(axis) + angle = rng.normal(scale=noise_rad) + q_noise = axis_angle2quat(axis, angle) + noisy_quat = (base_quat * q_noise).normalized() + + if t >= next_solve: + events.append( + SolveEvent( + timestamp=t, + ra_deg=ra_deg, + dec_deg=dec_deg, + roll_deg=roll_deg, + imu_quat=noisy_quat, + ) + ) + next_solve = t + solve_interval_s + else: + events.append( + ImuEvent( + timestamp=t, + imu_quat=noisy_quat, + moving=False, + true_ra_deg=ra_deg, + true_dec_deg=dec_deg, + ) + ) + + t += imu_dt + + return events + + +def generate_slew_session( + seed: int = 123, + duration_s: float = 30.0, + solve_interval_s: float = 3.0, + imu_rate_hz: float = 10.0, + slew_speed_deg_s: float = 2.0, + drift_arcsec_s: float = 5.0, +) -> list: + """ + Generate a slewing session: scope moves in RA at constant rate, + IMU has a slow systematic drift added on top. + """ + rng = np.random.default_rng(seed) + start_ra_deg, dec_deg, roll_deg = 90.0, 30.0, 0.0 + + tmp_dr = ImuDeadReckoning("flat") + events: List[Union[SolveEvent, ImuEvent]] = [] + t = 0.0 + imu_dt = 1.0 / imu_rate_hz + next_solve = 0.0 + last_solve_time = 0.0 + + drift_axis = rng.normal(size=3) + drift_axis /= np.linalg.norm(drift_axis) + drift_rate_rad_s = np.deg2rad(drift_arcsec_s / 3600.0) + + while t <= duration_s: + true_ra_deg = start_ra_deg + slew_speed_deg_s * t + true_ra_rad = np.deg2rad(true_ra_deg) + dec_rad = np.deg2rad(dec_deg) + roll_rad = np.deg2rad(roll_deg) + + base_quat = _make_imu_quat_for_radec(tmp_dr, true_ra_rad, dec_rad, roll_rad) + + time_since_solve = t - last_solve_time + drift_angle = drift_rate_rad_s * time_since_solve + q_drift = axis_angle2quat(drift_axis, drift_angle) + + noise_rad = np.deg2rad(1.0 / 3600.0) + noise_axis = rng.normal(size=3) + noise_axis /= np.linalg.norm(noise_axis) + q_noise = axis_angle2quat(noise_axis, rng.normal(scale=noise_rad)) + + drifted_quat = (base_quat * q_drift * q_noise).normalized() + + if t >= next_solve: + events.append( + SolveEvent( + timestamp=t, + ra_deg=true_ra_deg, + dec_deg=dec_deg, + roll_deg=roll_deg, + imu_quat=drifted_quat, + ) + ) + last_solve_time = t + next_solve = t + solve_interval_s + else: + events.append( + ImuEvent( + timestamp=t, + imu_quat=drifted_quat, + moving=True, + true_ra_deg=true_ra_deg, + true_dec_deg=dec_deg, + ) + ) + + t += imu_dt + + return events + + +# ── Replay engine ─────────────────────────────────────────────────── + + +def _solve_event_to_message(event: SolveEvent) -> SuccessfulSolve: + """Build the solver→integrator message for a synthetic solve event, + mirroring the real solver (camera == aligned: no alignment offset).""" + pointing = Pointing(RA=event.ra_deg, Dec=event.dec_deg, Roll=event.roll_deg) + return SuccessfulSolve( + camera=pointing, + aligned=pointing, + imu_anchor=event.imu_quat, + last_solve_attempt=event.timestamp, + last_solve_success=event.timestamp, + ) + + +def _imu_event_to_sample(event: ImuEvent) -> ImuSample: + """Build the shared-state IMU sample for a synthetic IMU event.""" + return ImuSample( + quat=event.imu_quat, + timestamp=event.timestamp, + status=3, + moving=event.moving, + ) + + +def _aligned_estimate_error_arcsec( + estimate: PointingEstimate, event: ImuEvent +) -> float: + """Angular error of the published aligned estimate vs ground truth.""" + aligned = estimate.pointing.aligned.estimate + assert aligned is not None, "callers only measure populated estimates" + error_deg = angular_separation_deg( + aligned.RA, + aligned.Dec, + event.true_ra_deg, + event.true_dec_deg, + ) + return error_deg * 3600.0 + + +def replay_imu_drift(events: list) -> List[Measurement]: + """ + Replay telemetry and measure dead-reckoning error at each IMU update + by comparing the integrator's published estimate to ground truth. + """ + idr = ImuDeadReckoning("flat") + estimate = PointingEstimate() + measurements: List[Measurement] = [] + + for event in events: + if isinstance(event, SolveEvent): + estimate = _apply_successful_solve( + estimate, _solve_event_to_message(event), idr + ) + + elif isinstance(event, ImuEvent): + if not idr.is_initialized() or estimate.imu_anchor is None: + continue + + # May not advance (below deadband); the estimate then stays at + # the last applied value, which is still the published answer. + _advance_with_imu(estimate, idr, _imu_event_to_sample(event)) + + if estimate.pointing.aligned.estimate is not None: + measurements.append( + Measurement( + error_arcsec=_aligned_estimate_error_arcsec(estimate, event), + timestamp=event.timestamp, + ) + ) + + return measurements + + +def replay_post_solve_errors(events: list, max_readings: int = 3) -> List[Measurement]: + """ + Measure dead-reckoning error of the first few IMU readings after each solve + vs ground truth. Verifies that solve incorporation resets drift. + """ + idr = ImuDeadReckoning("flat") + estimate = PointingEstimate() + measurements: List[Measurement] = [] + readings_since_solve = max_readings + + for event in events: + if isinstance(event, SolveEvent): + estimate = _apply_successful_solve( + estimate, _solve_event_to_message(event), idr + ) + readings_since_solve = 0 + + elif isinstance(event, ImuEvent): + if not idr.is_initialized() or estimate.imu_anchor is None: + continue + + _advance_with_imu(estimate, idr, _imu_event_to_sample(event)) + + if ( + readings_since_solve < max_readings + and estimate.pointing.aligned.estimate is not None + ): + measurements.append( + Measurement( + error_arcsec=_aligned_estimate_error_arcsec(estimate, event), + timestamp=event.timestamp, + ) + ) + readings_since_solve += 1 + + return measurements + + +# ── Tests ─────────────────────────────────────────────────────────── + + +@pytest.mark.integration +class TestIntegratorDrift: + """ + Integration tests that replay synthetic telemetry through the real + ImuDeadReckoning and integrator apply/advance functions, measuring + drift vs ground truth. + """ + + def test_stationary_drift(self): + """ + Scope is stationary, IMU has only tiny noise. + Dead-reckoned position should stay very close to truth. + """ + events = generate_stationary_session(seed=42, duration_s=10.0) + measurements = replay_imu_drift(events) + + assert len(measurements) > 0, "Should have at least one measurement" + errors = [m.error_arcsec for m in measurements] + mean_error = np.mean(errors) + max_error = np.max(errors) + + # Stationary scope with 1 arcsec noise: drift should be tiny + # Baseline: ~0 arcsec (noise below measurement precision) + assert ( + mean_error < 5 + ), f"Stationary mean drift {mean_error:.1f} arcsec exceeds 5 arcsec threshold" + assert ( + max_error < 10 + ), f"Stationary max drift {max_error:.1f} arcsec exceeds 10 arcsec threshold" + + def test_slew_tracking_accuracy(self): + """ + Scope slewing at 2 deg/s with 5 arcsec/s simulated IMU drift. + Dead-reckoning should track the true position with bounded error. + Error should not grow without bound over the session. + """ + events = generate_slew_session(seed=123, duration_s=30.0) + measurements = replay_imu_drift(events) + + assert len(measurements) > 0, "Should have at least one measurement" + errors = [m.error_arcsec for m in measurements] + mean_error = np.mean(errors) + + # With 5 arcsec/s drift and 3s solve intervals, accumulated drift + # between solves is bounded. Baseline: mean ~6, max ~13 arcsec. + assert ( + mean_error < 15 + ), f"Slew mean drift {mean_error:.1f} arcsec exceeds 15 arcsec threshold" + + # Verify errors don't grow without bound across the session + if len(errors) >= 20: + first_quarter = np.mean(errors[: len(errors) // 4]) + last_quarter = np.mean(errors[-len(errors) // 4 :]) + assert last_quarter < first_quarter * 3, ( + f"Drift growing over time: first quarter {first_quarter:.1f}, " + f"last quarter {last_quarter:.1f} arcsec" + ) + + def test_solve_correction_resets_drift(self): + """ + After each solve, the first few IMU dead-reckoned positions should + have near-zero error vs ground truth (solve corrects drift). + """ + events = generate_slew_session( + seed=456, + duration_s=20.0, + solve_interval_s=4.0, + slew_speed_deg_s=1.0, + drift_arcsec_s=2.0, + ) + measurements = replay_post_solve_errors(events, max_readings=3) + + assert len(measurements) > 0, "Should have post-solve measurements" + errors = [m.error_arcsec for m in measurements] + mean_error = np.mean(errors) + + # Right after a solve, error should be just noise + tiny drift. + # Baseline: mean ~7, max ~11 arcsec. + assert mean_error < 20, ( + f"Post-solve mean error {mean_error:.1f} arcsec exceeds 20 arcsec threshold. " + "Solve correction may not be resetting drift properly." + ) diff --git a/python/tests/test_telemetry.py b/python/tests/test_telemetry.py new file mode 100644 index 00000000..fb299332 --- /dev/null +++ b/python/tests/test_telemetry.py @@ -0,0 +1,996 @@ +""" +Unit tests for telemetry recording, replay, and the TelemetryManager facade. +""" + +import json +import queue +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +import quaternion as quaternion_module + +from PiFinder.telemetry import ( + TelemetryManager, + TelemetryPlayer, + TelemetryRecorder, + _serialize_quat, +) +from PiFinder.types.positioning import ( + FailedSolve, + ImuSample, + Pointing, + SolveDiagnostics, + SuccessfulSolve, +) + + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _make_quat(w=1.0, x=0.0, y=0.0, z=0.0): + return quaternion_module.quaternion(w, x, y, z) + + +def _make_imu_sample( + quat=None, timestamp=1000.0, status=3, moving=False, gyro=None, accel=None +): + return ImuSample( + quat=quat or _make_quat(), + timestamp=timestamp, + status=status, + moving=moving, + gyro=gyro, + accel=accel, + ) + + +def _make_successful_solve( + ra=180.0, + dec=45.0, + roll=10.0, + cam_ra=180.1, + cam_dec=44.9, + cam_roll=10.0, + imu_anchor=None, + last_solve_attempt=1000.4, + last_solve_success=1000.5, + matches=15, + rmse=0.5, +): + return SuccessfulSolve( + camera=Pointing(RA=cam_ra, Dec=cam_dec, Roll=cam_roll), + aligned=Pointing(RA=ra, Dec=dec, Roll=roll), + imu_anchor=imu_anchor, + last_solve_attempt=last_solve_attempt, + last_solve_success=last_solve_success, + diagnostics=SolveDiagnostics(Matches=matches, RMSE=rmse), + ) + + +def _make_failed_solve(last_solve_attempt=1000.4, last_solve_success=None): + return FailedSolve( + diagnostics=SolveDiagnostics(Matches=0), + last_solve_attempt=last_solve_attempt, + last_solve_success=last_solve_success, + ) + + +def _make_location(lat=40.0, lon=-74.0, alt=100.0): + loc = MagicMock() + loc.lat = lat + loc.lon = lon + loc.altitude = alt + loc.lock = False + loc.source = "gps" + return loc + + +def _make_shared_state(location=None, dt=None): + ss = MagicMock() + ss.location.return_value = location or _make_location() + ss.datetime.return_value = dt + return ss + + +def _make_cfg( + telemetry_record=False, + telemetry_images=False, + screen_direction="flat", + mount_type="Alt/Az", +): + cfg = MagicMock() + + def get_option(key): + return { + "telemetry_record": telemetry_record, + "telemetry_images": telemetry_images, + "screen_direction": screen_direction, + "mount_type": mount_type, + }.get(key) + + cfg.get_option = get_option + return cfg + + +def _write_session_jsonl(path, events, header=None): + """Write a list of event dicts to a JSONL file.""" + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w") as f: + if header: + f.write(json.dumps(header) + "\n") + for ev in events: + f.write(json.dumps(ev) + "\n") + + +# ── _serialize_quat ────────────────────────────────────────────────── + + +@pytest.mark.unit +class TestSerializeQuat: + def test_none(self): + assert _serialize_quat(None) is None + + def test_valid(self): + q = _make_quat(1.0, 2.0, 3.0, 4.0) + result = _serialize_quat(q) + assert result == [1.0, 2.0, 3.0, 4.0] + + def test_non_quaternion(self): + assert _serialize_quat("not a quat") is None + + def test_identity(self): + q = _make_quat(1.0, 0.0, 0.0, 0.0) + assert _serialize_quat(q) == [1.0, 0.0, 0.0, 0.0] + + +# ── TelemetryRecorder ─────────────────────────────────────────────── + + +@pytest.mark.unit +class TestTelemetryRecorder: + def test_disabled_by_default(self): + rec = TelemetryRecorder() + assert not rec.enabled + + def test_record_imu_noop_when_disabled(self): + rec = TelemetryRecorder() + rec.record_imu(_make_imu_sample()) + assert len(rec._buffer) == 0 + + def test_record_solve_noop_when_disabled(self): + rec = TelemetryRecorder() + result = rec.record_solve(_make_successful_solve()) + assert result is None + assert len(rec._buffer) == 0 + + def test_record_solve_noop_for_none(self): + rec = TelemetryRecorder() + rec.enabled = True + result = rec.record_solve(None) + assert result is None + + def test_start_creates_session(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + cfg = _make_cfg(screen_direction="flat") + ss = _make_shared_state() + rec.start(cfg, ss) + try: + assert rec.enabled + assert rec._session_dir is not None + assert rec._session_dir.exists() + assert len(rec._buffer) == 1 # header + finally: + rec.stop() + + def test_stop_flushes_and_closes(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + session_dir = rec._session_dir + session_file = session_dir / "session.jsonl" + rec.stop() + assert not rec.enabled + assert rec._file is None + assert rec._session_dir is None + content = session_file.read_text() + assert '"e": "hdr"' in content + # Location should be in separate file, not in header + assert '"loc"' not in content + loc_file = session_dir / "session.location" + assert loc_file.exists() + loc_data = json.loads(loc_file.read_text()) + assert loc_data["lat"] == 40.0 + + def test_record_imu_when_enabled(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + rec.record_imu( + _make_imu_sample( + quat=_make_quat(1, 0, 0, 0), + moving=True, + status=3, + gyro=(0.01, -0.02, 0.03), + accel=(0.1, 0.2, -0.3), + ) + ) + assert len(rec._buffer) == 2 # header + imu + line = json.loads(rec._buffer[-1]) + assert line["e"] == "imu" + assert line["q"] == [1.0, 0.0, 0.0, 0.0] + assert line["mv"] is True + assert line["gyro"] == [0.01, -0.02, 0.03] + assert line["accel"] == [0.1, 0.2, -0.3] + finally: + rec.stop() + + def test_record_imu_stationary_decimation(self, tmp_path): + """Stationary samples are decimated; moving samples are not.""" + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + for _ in range(10): + rec.record_imu(_make_imu_sample(moving=False)) + # Only every 10th stationary sample is recorded + assert len(rec._buffer) == 2 # header + 1 imu + for _ in range(3): + rec.record_imu(_make_imu_sample(moving=True)) + assert len(rec._buffer) == 5 # + 3 moving samples + finally: + rec.stop() + + def test_record_solve_when_enabled(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + t = rec.record_solve( + _make_successful_solve( + ra=180.0, + dec=45.0, + roll=10.0, + cam_ra=180.1, + cam_dec=44.9, + cam_roll=10.0, + imu_anchor=_make_quat(1, 0, 0, 0), + last_solve_attempt=1000.4, + last_solve_success=1000.5, + ), + predicted=Pointing(RA=179.5, Dec=44.8, Roll=0.0), + ) + assert t is not None + assert len(rec._buffer) == 2 # header + solve + line = json.loads(rec._buffer[-1]) + assert line["e"] == "solve" + assert line["ra"] == 180.0 + assert line["pred_ra"] == 179.5 + assert line["cam_ra"] == 180.1 + assert line["iq"] == [1.0, 0.0, 0.0, 0.0] + assert line["lsa"] == 1000.4 + assert line["lss"] == 1000.5 + assert line["src"] == "CAM" + finally: + rec.stop() + + def test_record_failed_solve(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + t = rec.record_solve(_make_failed_solve(last_solve_attempt=1000.4)) + assert t is not None + line = json.loads(rec._buffer[-1]) + assert line["e"] == "solve" + assert line["ra"] is None + assert line["cam_ra"] is None + assert line["iq"] is None + assert line["matches"] == 0 + assert line["lsa"] == 1000.4 + assert line["src"] == "CAM_FAILED" + finally: + rec.stop() + + def test_flush_time_gated(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + rec._last_flush = time.time() + rec.flush() + # Buffer should NOT be flushed (< 5s elapsed) + assert len(rec._buffer) == 1 # header still in buffer + finally: + rec.stop() + + def test_do_flush_writes_to_file(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + rec.record_imu(_make_imu_sample(moving=True)) + rec._do_flush() + assert len(rec._buffer) == 0 + content = (rec._session_dir / "session.jsonl").read_text() + lines = [line for line in content.strip().split("\n") if line] + assert len(lines) == 2 # header + imu + finally: + rec.stop() + + def test_get_session_dir(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + assert rec.get_session_dir() is None + rec.start(_make_cfg(), _make_shared_state()) + try: + assert rec.get_session_dir() is not None + finally: + rec.stop() + assert rec.get_session_dir() is None + + def test_stop_idempotent(self): + rec = TelemetryRecorder() + rec.stop() # no-op + rec.stop() # still no-op + + def test_record_target(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + target = MagicMock() + target.object_id = 42 + target.display_name = "NGC 7331" + target.ra = 339.267 + target.dec = 34.416 + rec.record_target(target) + assert len(rec._buffer) == 2 # header + target + line = json.loads(rec._buffer[-1]) + assert line["e"] == "tgt" + assert line["name"] == "NGC 7331" + assert line["ra"] == 339.267 + assert line["dec"] == 34.416 + assert "alt" in line + assert "az" in line + finally: + rec.stop() + + def test_record_target_dedup(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + target = MagicMock() + target.object_id = 42 + target.display_name = "NGC 7331" + target.ra = 339.267 + target.dec = 34.416 + rec.record_target(target) + rec.record_target(target) # same target, should be deduped + assert len(rec._buffer) == 2 # header + one target + finally: + rec.stop() + + def test_record_target_change(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + t1 = MagicMock() + t1.object_id = 42 + t1.display_name = "NGC 7331" + t1.ra = 339.267 + t1.dec = 34.416 + t2 = MagicMock() + t2.object_id = 99 + t2.display_name = "M 31" + t2.ra = 10.684 + t2.dec = 41.269 + rec.record_target(t1) + rec.record_target(t2) + assert len(rec._buffer) == 3 # header + 2 targets + finally: + rec.stop() + + def test_record_target_cleared(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + try: + target = MagicMock() + target.object_id = 42 + target.display_name = "NGC 7331" + target.ra = 339.267 + target.dec = 34.416 + rec.record_target(target) + rec.record_target(None) + assert len(rec._buffer) == 3 # header + target + clear + line = json.loads(rec._buffer[-1]) + assert line["e"] == "tgt" + assert line["name"] is None + assert line["ra"] is None + assert line["alt"] is None + assert line["az"] is None + finally: + rec.stop() + + +# ── TelemetryPlayer ───────────────────────────────────────────────── + + +@pytest.mark.unit +class TestTelemetryPlayer: + def test_load_from_file(self, tmp_path): + events = [ + {"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0], "mv": False}, + {"t": 1001.0, "e": "solve", "ra": 180.0, "dec": 45.0}, + ] + hdr = {"t": 999.0, "e": "hdr", "loc": [40.0, -74.0, 100.0]} + session_file = tmp_path / "session.jsonl" + _write_session_jsonl(session_file, events, header=hdr) + + player = TelemetryPlayer(session_file) + assert player.header is not None + assert player.header["e"] == "hdr" + assert len(player.events) == 2 + assert player.total_events == 2 + + def test_load_from_directory(self, tmp_path): + events = [{"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + player = TelemetryPlayer(tmp_path) + assert len(player.events) == 1 + + def test_progress(self, tmp_path): + events = [ + {"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0]}, + {"t": 1001.0, "e": "imu", "q": [1, 0, 0, 0]}, + ] + _write_session_jsonl(tmp_path / "session.jsonl", events) + player = TelemetryPlayer(tmp_path) + assert player.progress == 0.0 + assert player.current_index == 0 + + def test_progress_empty(self, tmp_path): + _write_session_jsonl(tmp_path / "session.jsonl", []) + player = TelemetryPlayer(tmp_path) + assert player.progress == 1.0 + + def test_get_next_event_timing(self, tmp_path): + now = time.time() + events = [ + {"t": now, "e": "imu", "q": [1, 0, 0, 0]}, + {"t": now + 100.0, "e": "imu", "q": [1, 0, 0, 0]}, + ] + _write_session_jsonl(tmp_path / "session.jsonl", events) + player = TelemetryPlayer(tmp_path) + + event, done = player.get_next_event() + assert event is not None + assert event["e"] == "imu" + assert not done + + # Second event is 100s in the future, should not be ready + event2, done2 = player.get_next_event() + assert event2 is None + assert not done2 + + def test_get_next_event_done(self, tmp_path): + now = time.time() + events = [{"t": now, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + player = TelemetryPlayer(tmp_path) + + event, done = player.get_next_event() + assert event is not None + assert done # last event + + event2, done2 = player.get_next_event() + assert event2 is None + assert done2 + + def test_reset(self, tmp_path): + now = time.time() + events = [{"t": now, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + player = TelemetryPlayer(tmp_path) + + player.get_next_event() + assert player.current_index == 1 + player.reset() + assert player.current_index == 0 + assert player._replay_start is None + + def test_event_to_solve_result_success(self): + event = { + "t": 1000.5, + "ra": 180.0, + "dec": 45.0, + "roll": 10.0, + "cam_ra": 180.1, + "cam_dec": 44.9, + "cam_roll": 10.0, + "matches": 15, + "rmse": 0.5, + "iq": [1.0, 0.0, 0.0, 0.0], + "lsa": 1000.4, + "lss": 1000.5, + "src": "CAM", + } + result = TelemetryPlayer.event_to_solve_result(event) + assert isinstance(result, SuccessfulSolve) + assert result.aligned.RA == 180.0 + assert result.aligned.Dec == 45.0 + assert result.aligned.Roll == 10.0 + assert result.camera.RA == 180.1 + assert result.diagnostics.Matches == 15 + assert result.diagnostics.RMSE == 0.5 + assert result.last_solve_attempt == 1000.4 + assert result.last_solve_success == 1000.5 + assert isinstance(result.imu_anchor, quaternion_module.quaternion) + assert result.imu_anchor.w == 1.0 + + def test_event_to_solve_result_no_imu_quat(self): + event = {"t": 1000.0, "ra": 180.0, "dec": 45.0} + result = TelemetryPlayer.event_to_solve_result(event) + assert isinstance(result, SuccessfulSolve) + assert result.aligned.RA == 180.0 + assert result.imu_anchor is None + # Missing lsa/lss fall back to the event timestamp + assert result.last_solve_attempt == 1000.0 + assert result.last_solve_success == 1000.0 + + def test_event_to_solve_result_no_camera_falls_back_to_aligned(self): + event = {"t": 1000.0, "ra": 180.0, "dec": 45.0, "roll": 10.0} + result = TelemetryPlayer.event_to_solve_result(event) + assert isinstance(result, SuccessfulSolve) + assert result.camera.RA == 180.0 + assert result.camera.Dec == 45.0 + + def test_event_to_solve_result_failed(self): + event = { + "t": 1000.0, + "ra": None, + "dec": None, + "matches": 0, + "lsa": 1000.0, + } + result = TelemetryPlayer.event_to_solve_result(event) + assert isinstance(result, FailedSolve) + assert result.diagnostics.Matches == 0 + assert result.last_solve_attempt == 1000.0 + assert result.last_solve_success is None + + def test_event_to_imu_sample(self): + event = {"t": 1000.0, "q": [1.0, 0.0, 0.0, 0.0], "mv": True, "st": 3} + result = TelemetryPlayer.event_to_imu_sample(event) + assert result is not None + assert isinstance(result, ImuSample) + assert isinstance(result.quat, quaternion_module.quaternion) + assert result.timestamp == 1000.0 + assert result.moving is True + assert result.status == 3 + + def test_event_to_imu_sample_no_quat(self): + event = {"t": 1000.0, "mv": False} + assert TelemetryPlayer.event_to_imu_sample(event) is None + + def test_event_to_imu_sample_defaults(self): + event = {"t": 1000.0, "q": [1.0, 0.0, 0.0, 0.0]} + result = TelemetryPlayer.event_to_imu_sample(event) + assert result.moving is False + assert result.status == 0 + assert result.gyro is None + assert result.accel is None + + def test_event_to_imu_sample_with_raw_sensors(self): + event = { + "t": 1000.0, + "q": [1.0, 0.0, 0.0, 0.0], + "gyro": [0.01, -0.02, 0.03], + "accel": [0.1, 0.2, -0.3], + } + result = TelemetryPlayer.event_to_imu_sample(event) + assert result.gyro == (0.01, -0.02, 0.03) + assert result.accel == (0.1, 0.2, -0.3) + + +# ── TelemetryManager ──────────────────────────────────────────────── + + +@pytest.mark.unit +class TestTelemetryManager: + def test_init_no_auto_record(self): + cfg = _make_cfg(telemetry_record=False) + ss = _make_shared_state() + cq = queue.Queue() + mgr = TelemetryManager(cfg, ss, cq) + assert not mgr.replaying + assert not mgr._recorder.enabled + + def test_init_auto_record(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + cfg = _make_cfg(telemetry_record=True) + ss = _make_shared_state() + cq = queue.Queue() + mgr = TelemetryManager(cfg, ss, cq) + try: + assert mgr._recorder.enabled + finally: + mgr.stop() + + def test_poll_commands_none_queue(self): + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), queue.Queue()) + mgr.poll_commands(None) # no-op + + def test_poll_commands_empty_queue(self): + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), queue.Queue()) + cmd_q = queue.Queue() + mgr.poll_commands(cmd_q) # no-op + + def test_handle_command_record_on(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + mgr._handle_command("telemetry_record_on", None) + try: + assert mgr._recorder.enabled + assert cq.get_nowait() == "Telemetry: Recording" + finally: + mgr.stop() + + def test_handle_command_record_off(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + mgr._handle_command("telemetry_record_on", None) + cq.get_nowait() # drain "Recording" msg + mgr._handle_command("telemetry_record_off", None) + assert not mgr._recorder.enabled + assert cq.get_nowait() == "Telemetry: Stopped" + + def test_handle_command_replay(self, tmp_path): + events = [{"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + mgr._handle_command("replay", str(tmp_path)) + assert mgr.replaying + assert cq.get_nowait() == "Telemetry: Replay started" + + def test_handle_command_replay_with_header(self, tmp_path): + hdr = {"t": 999.0, "e": "hdr", "dt": "2024-01-15T22:30:00"} + events = [{"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events, header=hdr) + loc_file = tmp_path / "session.location" + loc_file.write_text(json.dumps({"lat": 35.0, "lon": -120.0, "altitude": 200.0})) + + ss = _make_shared_state() + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), ss, cq) + mgr._handle_command("replay", str(tmp_path)) + + ss.set_location.assert_called_once() + loc_arg = ss.set_location.call_args[0][0] + assert loc_arg.lat == 35.0 + assert loc_arg.source == "replay" + ss.set_datetime.assert_called_once() + + def test_handle_command_replay_stop(self, tmp_path): + events = [{"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + + cq = queue.Queue() + cam_q = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq, cam_q) + mgr._handle_command("replay", str(tmp_path)) + cq.get_nowait() # drain "Replay started" + + mgr._handle_command("replay_stop", None) + assert not mgr.replaying + assert cq.get_nowait() == "Telemetry: Replay stopped" + assert cam_q.get_nowait() == "start" + + def test_next_replay_message_not_replaying(self): + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), queue.Queue()) + assert mgr.next_replay_message() is None + + def test_next_replay_message_imu_event(self, tmp_path): + now = time.time() + events = [{"t": now, "e": "imu", "q": [1, 0, 0, 0], "mv": True, "st": 3}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + mgr._handle_command("replay", str(tmp_path)) + cq.get_nowait() # drain "Replay started" + + message = mgr.next_replay_message() + assert isinstance(message, ImuSample) + assert message.moving is True + + def test_next_replay_message_solve_event(self, tmp_path): + now = time.time() + events = [ + { + "t": now, + "e": "solve", + "ra": 180.0, + "dec": 45.0, + "roll": 10.0, + "cam_ra": 180.1, + "cam_dec": 44.9, + "cam_roll": 10.0, + } + ] + _write_session_jsonl(tmp_path / "session.jsonl", events) + + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + mgr._handle_command("replay", str(tmp_path)) + cq.get_nowait() # drain "Replay started" + + message = mgr.next_replay_message() + assert isinstance(message, SuccessfulSolve) + assert message.aligned.RA == 180.0 + + def test_next_replay_message_failed_solve_event(self, tmp_path): + now = time.time() + events = [{"t": now, "e": "solve", "ra": None, "dec": None, "matches": 0}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + mgr._handle_command("replay", str(tmp_path)) + cq.get_nowait() # drain "Replay started" + + message = mgr.next_replay_message() + assert isinstance(message, FailedSolve) + + def test_next_replay_message_auto_finish(self, tmp_path): + now = time.time() + events = [{"t": now, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + + cq = queue.Queue() + cam_q = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq, cam_q) + mgr._handle_command("replay", str(tmp_path)) + cq.get_nowait() # drain "Replay started" + + # First call returns the event + message = mgr.next_replay_message() + assert message is not None + + # Next call: done → auto-cleanup + message2 = mgr.next_replay_message() + assert message2 is None + assert not mgr.replaying + assert cq.get_nowait() == "Telemetry: Replay finished" + assert cam_q.get_nowait() == "start" + + def test_record_solve_delegates(self): + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), queue.Queue()) + mgr._recorder = MagicMock() + mgr._recorder.record_solve.return_value = None + mgr.record_solve(_make_successful_solve()) + mgr._recorder.record_solve.assert_called_once() + + def test_record_solve_sends_image_command(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + cam_q = queue.Queue() + cfg = _make_cfg(telemetry_images=True) + mgr = TelemetryManager(cfg, _make_shared_state(), queue.Queue(), cam_q) + mgr._recorder.start(_make_cfg(), _make_shared_state()) + mgr._recorder.images_enabled = True + try: + mgr.record_solve(_make_successful_solve()) + msg = cam_q.get_nowait() + assert msg.startswith("save_image:") + finally: + mgr.stop() + + def test_record_imu_delegates(self): + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), queue.Queue()) + mgr._recorder = MagicMock() + mgr.record_imu(_make_imu_sample()) + mgr._recorder.record_imu.assert_called_once() + + def test_record_noop_while_replaying(self, tmp_path): + """Recording is suppressed while replaying to avoid re-recording + the replayed session.""" + events = [{"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events) + + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + mgr._recorder = MagicMock() + mgr._handle_command("replay", str(tmp_path)) + + mgr.record_solve(_make_successful_solve()) + mgr.record_imu(_make_imu_sample()) + mgr._recorder.record_solve.assert_not_called() + mgr._recorder.record_imu.assert_not_called() + + def test_flush_delegates(self): + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), queue.Queue()) + mgr._recorder = MagicMock() + mgr._recorder.enabled = False + mgr.flush() + mgr._recorder.flush.assert_called_once() + + def test_stop_delegates(self): + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), queue.Queue()) + mgr._recorder = MagicMock() + mgr.stop() + mgr._recorder.stop.assert_called_once() + + def test_poll_commands_dispatches_tuple(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + cmd_q = queue.Queue() + cmd_q.put(("telemetry_record_on", None)) + mgr.poll_commands(cmd_q) + try: + assert mgr._recorder.enabled + finally: + mgr.stop() + + def test_poll_commands_ignores_non_tuple(self): + cq = queue.Queue() + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), cq) + cmd_q = queue.Queue() + cmd_q.put("not a tuple") + mgr.poll_commands(cmd_q) # should not crash + + def test_restart_camera_no_queue(self): + mgr = TelemetryManager(_make_cfg(), _make_shared_state(), queue.Queue(), None) + mgr._restart_camera() # no-op, no crash + + def test_replay_header_mount_type_mismatch_warns(self, tmp_path): + """Replay should warn when header mount_type differs from config.""" + hdr = { + "t": 999.0, + "e": "hdr", + "dt": "2024-01-15T22:30:00", + "cfg": {"screen_direction": "flat", "mount_type": "EQ"}, + } + events = [{"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events, header=hdr) + + ss = _make_shared_state() + cq = queue.Queue() + cfg = _make_cfg(mount_type="Alt/Az") + mgr = TelemetryManager(cfg, ss, cq) + + with patch("PiFinder.telemetry.logger") as mock_logger: + mgr._handle_command("replay", str(tmp_path)) + mock_logger.warning.assert_called_once() + assert "EQ" in mock_logger.warning.call_args[0][1] + assert "Alt/Az" in mock_logger.warning.call_args[0][2] + + def test_replay_header_mount_type_match_no_warn(self, tmp_path): + """No warning when header mount_type matches config.""" + hdr = { + "t": 999.0, + "e": "hdr", + "cfg": {"screen_direction": "flat", "mount_type": "Alt/Az"}, + } + events = [{"t": 1000.0, "e": "imu", "q": [1, 0, 0, 0]}] + _write_session_jsonl(tmp_path / "session.jsonl", events, header=hdr) + + cfg = _make_cfg(mount_type="Alt/Az") + mgr = TelemetryManager(cfg, _make_shared_state(), queue.Queue()) + + with patch("PiFinder.telemetry.logger") as mock_logger: + mgr._handle_command("replay", str(tmp_path)) + mock_logger.warning.assert_not_called() + + def test_flush_polls_target(self, tmp_path): + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + ss = _make_shared_state() + target = MagicMock() + target.object_id = 42 + target.display_name = "M 31" + target.ra = 10.684 + target.dec = 41.269 + ui_state = MagicMock() + ui_state.target.return_value = target + ss.ui_state.return_value = ui_state + + mgr = TelemetryManager(_make_cfg(telemetry_record=True), ss, queue.Queue()) + try: + mgr.flush() + assert mgr._recorder._last_target_id == 42 + # Check a target event was buffered (header + target) + assert len(mgr._recorder._buffer) >= 2 + lines = [json.loads(line) for line in mgr._recorder._buffer] + tgt_lines = [line for line in lines if line.get("e") == "tgt"] + assert len(tgt_lines) == 1 + assert tgt_lines[0]["name"] == "M 31" + finally: + mgr.stop() + + def test_poll_target_no_ui_state(self): + """_poll_target should not crash if ui_state() raises.""" + ss = _make_shared_state() + ss.ui_state.side_effect = Exception("no ui state") + mgr = TelemetryManager(_make_cfg(), ss, queue.Queue()) + mgr._recorder.enabled = True + mgr._poll_target() # should not raise + + def test_header_includes_mount_type(self, tmp_path): + """Recording header should include mount_type from config.""" + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + cfg = _make_cfg(mount_type="EQ") + ss = _make_shared_state() + rec.start(cfg, ss) + try: + line = json.loads(rec._buffer[0]) + assert line["cfg"]["mount_type"] == "EQ" + finally: + rec.stop() + + +# ── Recording → replay round trip ─────────────────────────────────── + + +@pytest.mark.unit +class TestRecordReplayRoundTrip: + def test_recorded_solve_replays_as_equivalent_message(self, tmp_path): + """A recorded SuccessfulSolve comes back as an equivalent message.""" + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + original = _make_successful_solve(imu_anchor=_make_quat(1, 0, 0, 0)) + rec.record_solve(original) + rec._do_flush() + session_dir = rec._session_dir + rec.stop() + + player = TelemetryPlayer(session_dir) + assert player.total_events == 1 + replayed = TelemetryPlayer.event_to_solve_result(player.events[0]) + assert isinstance(replayed, SuccessfulSolve) + assert replayed.aligned.RA == original.aligned.RA + assert replayed.aligned.Dec == original.aligned.Dec + assert replayed.camera.RA == original.camera.RA + assert replayed.imu_anchor == original.imu_anchor + assert replayed.last_solve_attempt == original.last_solve_attempt + assert replayed.last_solve_success == original.last_solve_success + assert replayed.diagnostics.Matches == original.diagnostics.Matches + + def test_recorded_imu_replays_as_equivalent_sample(self, tmp_path): + """A recorded ImuSample comes back as an equivalent sample.""" + with patch("PiFinder.telemetry.TELEMETRY_DIR", tmp_path / "telemetry"): + rec = TelemetryRecorder() + rec.start(_make_cfg(), _make_shared_state()) + original = _make_imu_sample( + quat=_make_quat(0.5, 0.5, 0.5, 0.5), + moving=True, + status=3, + gyro=(0.01, -0.02, 0.03), + accel=(0.1, 0.2, -0.3), + ) + rec.record_imu(original) + rec._do_flush() + session_dir = rec._session_dir + rec.stop() + + player = TelemetryPlayer(session_dir) + assert player.total_events == 1 + replayed = TelemetryPlayer.event_to_imu_sample(player.events[0]) + assert isinstance(replayed, ImuSample) + assert replayed.quat == original.quat + assert replayed.moving is True + assert replayed.status == 3 + assert replayed.gyro == original.gyro + assert replayed.accel == original.accel