Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,4 @@ astro_data/comets.txt
# users GPS data
test_ubx
test_ubx/*
python/telemetry_analysis/
4 changes: 3 additions & 1 deletion default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,7 @@
"active_telescope_index": 0,
"active_eyepiece_index": 0
},
"imu_threshold_scale": 1
"imu_threshold_scale": 1,
"telemetry_record": false,
"telemetry_images": false
}
12 changes: 11 additions & 1 deletion python/PiFinder/camera_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,17 @@ def get_image_loop(
f"Exposure saved and auto-exposure disabled: {self.exposure_time}µs"
)

if command.startswith("save"):
if command.startswith("save_image:"):
# Save current camera frame to specified path
save_path = command.split(":", 1)[1]
try:
img = camera_image.copy()
img.save(save_path, "PNG", compress_level=6)
logger.debug("Telemetry image saved: %s", save_path)
except Exception as e:
logger.error("Failed to save telemetry image: %s", e)

if command.startswith("save:"):
# Set flag to save next capture to this file
self._save_next_to = command.split(":")[1]
console_queue.put("CAM: Save flag set")
Expand Down
7 changes: 7 additions & 0 deletions python/PiFinder/imu_pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ def imu_monitor(shared_state, console_queue, log_queue):
imu.update()
imu_sample.status = imu.calibration

# Read raw sensor data for telemetry

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make more sense to do this in imu.update() when the quaternions are read out?

try:
imu_sample.gyro = imu.sensor.gyro
imu_sample.accel = imu.sensor.linear_acceleration
except Exception:
pass

if imu.moving():
if not imu_sample.moving:
logger.debug("IMU: move start")
Expand Down
84 changes: 76 additions & 8 deletions python/PiFinder/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
prediction during dead-reckoning. The IDR remains a math primitive
(``RaDecRoll`` in, ``RaDecRoll`` out); this module bridges between it
and :class:`PointingEstimate`.

Telemetry record/replay is handled by :class:`TelemetryManager`
(``telemetry.py``). Replayed sessions are converted back into
:class:`SolveResult` / :class:`ImuSample` messages and fed through the
same ``_apply_*`` / ``_advance_with_imu`` paths as live data.
"""

from __future__ import annotations
Expand All @@ -45,6 +50,7 @@
from PiFinder.multiproclogging import MultiprocLogging
from PiFinder.pointing_model.imu_dead_reckoning import ImuDeadReckoning
import PiFinder.pointing_model.quaternion_transforms as qt
from PiFinder.telemetry import TelemetryManager
from PiFinder.types.positioning import (
FailedSolve,
ImuSample,
Expand All @@ -62,12 +68,21 @@
IMU_MOVED_ANG_THRESHOLD = np.deg2rad(0.06)


def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=False):
def integrator(
shared_state,
solver_queue,
console_queue,
log_queue,
is_debug=False,
command_queue=None,
camera_command_queue=None,
):
MultiprocLogging.configurer(log_queue)
if is_debug:
logger.setLevel(logging.DEBUG)
logger.debug("Starting Integrator")

telemetry = None
try:
cfg = config.Config()
screen_direction = cfg.get_option("screen_direction")
Expand All @@ -82,22 +97,59 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
# Epoch of the last estimate we published; gate re-publishing on it.
last_published_time = time.time()

was_replaying = False
telemetry = TelemetryManager(
cfg, shared_state, console_queue, camera_command_queue
)

while True:
state_utils.sleep_for_framerate(shared_state)

telemetry.poll_commands(command_queue)

pointing_updated = False

# 1. Pull any pending solve result from the queue.
# 1. Pull the next message — from the replay stream when
# replaying, otherwise from the solver queue.
solve_result: Optional[SolveResult] = None
try:
solve_result = solver_queue.get(block=False)
except queue.Empty:
pass
replay_imu: Optional[ImuSample] = None

if telemetry.replaying:
if not was_replaying:
was_replaying = True
# Recorded epochs are in the past; rewind the publish
# gate so replayed estimates pass the newer-than check.
last_published_time = 0.0
# The solver keeps running during replay; discard its output.
_drain_queue(solver_queue)
message = telemetry.next_replay_message()
if isinstance(message, (SuccessfulSolve, FailedSolve)):
solve_result = message
elif isinstance(message, ImuSample):
replay_imu = message
else:
if was_replaying:
# Replay ended — reset to a clean unanchored state.
was_replaying = False
estimate = PointingEstimate()
idr.reset()
last_published_time = time.time()
logger.info("Replay ended, integrator state reset")
try:
solve_result = solver_queue.get(block=False)
except queue.Empty:
pass

if isinstance(solve_result, SuccessfulSolve):
telemetry.record_solve(
solve_result, predicted=estimate.pointing.aligned.estimate
)
estimate = _apply_successful_solve(estimate, solve_result, idr)
pointing_updated = True
elif isinstance(solve_result, FailedSolve):
telemetry.record_solve(
solve_result, predicted=estimate.pointing.aligned.estimate
)
estimate = _apply_failed_solve(estimate, solve_result)
# Publish unconditionally so auto-exposure sees the failed
# attempt (Matches=0, fresh last_solve_attempt). The estimate
Expand All @@ -107,14 +159,16 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
shared_state.set_solution(copy.deepcopy(estimate))

# 2. If we have an anchor and didn't just do a fresh plate-solve,
# try to advance the estimate via IMU dead-reckoning.
# try to advance the estimate via IMU dead-reckoning. The IMU
# sample comes from the replay stream when replaying.
if (
not pointing_updated
and idr.is_initialized()
and estimate.imu_anchor is not None
):
imu = shared_state.imu()
imu = replay_imu if telemetry.replaying else shared_state.imu()
if imu:
telemetry.record_imu(imu)
if _advance_with_imu(estimate, idr, imu):
pointing_updated = True

Expand All @@ -137,8 +191,13 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
shared_state.set_solution(copy.deepcopy(estimate))
last_published_time = estimate.estimate_time

telemetry.flush()

except EOFError:
logger.error("Main no longer running for integrator")
finally:
if telemetry is not None:
telemetry.stop()


def _apply_successful_solve(
Expand Down Expand Up @@ -263,3 +322,12 @@ def _get_alt_az(ra_deg, dec_deg, location, dt) -> tuple[float | None, float | No
return None, None
calc_utils.sf_utils.set_location(location.lat, location.lon, location.altitude)
return calc_utils.sf_utils.radec_to_altaz(ra_deg, dec_deg, dt)


def _drain_queue(q):
"""Discard all pending items from a queue."""
try:
while True:
q.get(block=False)
except queue.Empty:
pass
8 changes: 8 additions & 0 deletions python/PiFinder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def setup_dirs():
utils.create_path(Path(utils.data_dir, "screenshots"))
utils.create_path(Path(utils.data_dir, "solver_debug_dumps"))
utils.create_path(Path(utils.data_dir, "logs"))
utils.create_path(Path(utils.data_dir, "telemetry"))
os.chmod(Path(utils.data_dir), 0o777)


Expand Down Expand Up @@ -384,13 +385,16 @@ def main(
logger.info("PiFinder running on %s, %s, %s", os_detail, platform, arch)

# init UI Modes
integrator_command_queue: Queue = Queue()

command_queues = {
"camera": camera_command_queue,
"console": console_queue,
"ui_queue": ui_queue,
"align_command": alignment_command_queue,
"align_response": alignment_response_queue,
"gps": gps_queue,
"integrator": integrator_command_queue,
}
cfg = config.Config()

Expand Down Expand Up @@ -545,6 +549,10 @@ def main(
integrator_logqueque,
verbose,
),
kwargs={
"command_queue": integrator_command_queue,
"camera_command_queue": camera_command_queue,
},
)
integrator_process.start()

Expand Down
Loading
Loading