Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"Bash(git log:*)"
]
}
}
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"matplotlib>=3.10.7",
"numpy>=2.2.6",
"opencv-contrib-python-headless>=4.13.0.92",
"py-gpmf-parser>=0.1.1",
"pydantic>=2.12.5",
"rich>=14.1.0",
"tqdm>=4.67.1",
Expand Down Expand Up @@ -67,3 +68,6 @@ lint.ignore = [
"PLW2901", # For loop variable overwritten.
"PLW0642", # Reassigned self in instance method.
]

[tool.setuptools.package-data]
opai = ["configs/*"]
8 changes: 6 additions & 2 deletions src/opai/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from opai.presentation.facade import (
add_demos,
add_mapping,
add_mapping_video,
browse_session,
calibrate,
calibrate_with_video,
Expand All @@ -11,12 +11,14 @@
main,
plot_video_frames,
register_gopro,
run_extract_trajectories_batch,
run_mapping,
verify_calibrated_parameters,
)

__all__ = [
"add_demos",
"add_mapping",
"add_mapping_video",
"browse_session",
"calibrate",
"calibrate_with_video",
Expand All @@ -26,6 +28,8 @@
"list_sessions",
"main",
"plot_video_frames",
"run_extract_trajectories_batch",
"run_mapping",
"verify_calibrated_parameters",
"register_gopro",
]
5 changes: 5 additions & 0 deletions src/opai/application/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@
from opai.application.session import (
add_demos,
add_mapping,
add_mapping_video,
browse_session,
list_sessions,
)
from opai.application.slam import run_extract_trajectories_batch, run_mapping

__all__ = [
"add_demos",
"add_mapping",
"add_mapping_video",
"browse_session",
"calibrate",
"generate_charuco_board",
"list_sessions",
"run_extract_trajectories_batch",
"run_mapping",
"verify_calibrated_parameters",
"get_media_list",
"list_downloaded_thumbnails",
Expand Down
37 changes: 36 additions & 1 deletion src/opai/application/gopro.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from opai.domain.context import Context
from opai.domain.gopro import GPMedia, GPMediaList, GPThumbnail, GPThumbnailIndex
from opai.infrastructure.logger import get_logger
from opai.infrastructure.persistence import (
load_gopro_thumbnail_index,
write_gopro_thumbnail_index,
Expand All @@ -23,6 +24,8 @@
DOWNLOAD_CHUNK_SIZE = 1024 * 1024
THUMBNAIL_INDEX_FILENAME = "gopro_thumbnail_index.json"

logger = get_logger(__name__)


def ensure_gopro_connected(ctx: Context) -> None:
_run_async(_ensure_gopro_connected(ctx))
Expand Down Expand Up @@ -69,7 +72,13 @@ def register_gopro(

socket_address_template = "172.2{}.1{}{}.51:8080"
ctx.set_gopro_socket_address(socket_address_template.format(*serial_number[-3:]))
logger.info(
"Registered GoPro for session %s at %s",
ctx.name,
ctx.gopro_socket_address,
)
if download_thumbnails:
logger.info("Downloading GoPro thumbnails for session %s", ctx.name)
_download_thumbnails(ctx)


Expand All @@ -93,6 +102,12 @@ def download_file_from_gopro(
url = f"http://{ctx.gopro_socket_address}/videos/DCIM/{source_directory}/{source_filename}"

output_file = destination / (output_filename or source_filename)
logger.info(
"Downloading GoPro file %s/%s to %s",
source_directory,
source_filename,
output_file,
)
_run_async(
_download_stream_to_file(ctx, url, output_file, progress_label=output_file.name)
)
Expand All @@ -110,10 +125,13 @@ async def _get_media_list(ctx: Context) -> GPMediaList:

url = f"http://{ctx.gopro_socket_address}/gopro/media/list"
try:
logger.info("Fetching GoPro media list from %s", ctx.gopro_socket_address)
async with _create_async_client() as client:
response = await client.get(url)
response.raise_for_status()
return GPMediaList(**response.json())
media_list = GPMediaList(**response.json())
logger.info("Fetched %s GoPro media directories", len(media_list.media))
return media_list
except httpx.HTTPError as exc:
raise OPAIGoProRegistrationError(
"Failed to fetch the GoPro media list. Verify the camera connection and try opai.register_gopro(...) again.",
Expand All @@ -135,14 +153,20 @@ def list_downloaded_thumbnails(ctx: Context) -> list[GPThumbnail]:
raise OPAIValidationError(
"No GoPro thumbnails found. Call opai.register_gopro(..., download_thumbnails=True) first."
)
logger.info("Found %s downloaded GoPro thumbnails", len(items))
return items


def _download_thumbnails(ctx: Context) -> None:
if _thumbnail_index_path(ctx).exists():
logger.info(
"Skipping GoPro thumbnail download because index already exists at %s",
_thumbnail_index_path(ctx),
)
return

destination_root = ctx.session_directory / "gopro_thumbnails"
logger.info("Downloading GoPro thumbnails into %s", destination_root)
thumbnails: list[GPThumbnail] = []
for media in get_media_list(ctx).media:
thumbnails.extend(
Expand All @@ -163,6 +187,9 @@ def _download_thumbnails_for_directory(
destination = destination_root / media.d
destination.mkdir(parents=True, exist_ok=True)
thumbnails: list[GPThumbnail] = []
logger.info(
"Downloading %s thumbnails from GoPro directory %s", len(media.fs), media.d
)

for file in media.fs:
thumbnail_filename = Path(file.n).with_suffix(".jpg").name
Expand Down Expand Up @@ -204,6 +231,7 @@ def _download_thumbnail_from_gopro(

url = f"http://{ctx.gopro_socket_address}/gopro/media/thumbnail?path={media_path}"
output_file = destination / output_filename
logger.info("Downloading GoPro thumbnail %s to %s", media_path, output_file)
_run_async(
_download_stream_to_file(ctx, url, output_file, progress_label=output_file.name)
)
Expand Down Expand Up @@ -243,7 +271,14 @@ async def _download_stream_to_file(
handle.write(chunk)
progress_bar.update(len(chunk))
temp_file.replace(output_file)
logger.info("Downloaded GoPro asset %s to %s", progress_label, output_file)
except httpx.HTTPError as exc:
logger.error(
"Failed to download GoPro asset %s from %s: %s",
progress_label,
url,
exc,
)
raise OPAIGoProRegistrationError(
f"Failed to download '{progress_label}' from the GoPro.",
details={
Expand Down
96 changes: 96 additions & 0 deletions src/opai/application/imu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import annotations

import json
from pathlib import Path

from py_gpmf_parser.gopro_telemetry_extractor import GoProTelemetryExtractor

from opai.core.exceptions import OPAIWorkflowError
from opai.domain.imu import (
DEFAULT_IMU_RECORDING_ID,
IMUPayload,
IMURecording,
IMUSample,
IMUStream,
)
from opai.infrastructure.logger import get_logger

SECS_TO_MS = 1e3
DEFAULT_STREAM_TYPES = [
"ACCL",
"GYRO",
"GPS5",
"GPSP",
"GPSU",
"GPSF",
"GRAV",
"MAGN",
"CORI",
"IORI",
"TMPC",
]

logger = get_logger(__name__)


def extract_imu_from_video(
video_path: Path,
dest: Path,
stream_types: list[str] = DEFAULT_STREAM_TYPES,
) -> Path:
"""Extract IMU data from a video using py_gpmf_parser."""

logger.info("Extracting IMU streams from %s to %s", video_path, dest)
extractor = GoProTelemetryExtractor(str(video_path))
try:
extractor.open_source()

streams: dict[str, IMUStream] = {}
for stream in stream_types:
payload = extractor.extract_data(stream)
if payload and len(payload[0]) > 0:
logger.info(
"Extracted %s IMU samples for stream %s",
len(payload[0]),
stream,
)
streams[stream] = IMUStream(
samples=tuple(
IMUSample(
value=data.tolist(),
cts=_coerce_timestamp_millis(ts),
)
for data, ts in zip(*payload)
)
)

output = IMUPayload(
recording_id=DEFAULT_IMU_RECORDING_ID,
recording=IMURecording(streams=streams),
frames_per_second=0.0,
)
dest.write_text(json.dumps(output.to_dict(), indent=2), encoding="utf-8")
logger.info("Wrote IMU payload to %s", dest)

return dest

except Exception as exc:
logger.exception("Failed to extract IMU data from %s", video_path)
raise OPAIWorkflowError(f"Error processing {video_path}: {exc}") from exc

finally:
extractor.close_source()


def _coerce_timestamp_millis(timestamp: object) -> int | float:
millis = timestamp * SECS_TO_MS
if hasattr(millis, "tolist"):
millis = millis.tolist()
if isinstance(millis, list):
if len(millis) != 1:
raise OPAIWorkflowError(
"IMU timestamp payload must contain exactly one scalar value.",
details={"timestamp": str(millis)},
)
return millis[0]
return millis
4 changes: 4 additions & 0 deletions src/opai/application/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def add_mapping(ctx: Context, video_path: str | Path) -> MappingAsset:
return mapping_asset


def add_mapping_video(ctx: Context, video_path: str | Path) -> MappingAsset:
return add_mapping(ctx, video_path)


def list_sessions() -> list[str]:
return [session.name for session in describe_sessions().sessions]

Expand Down
Loading
Loading