diff --git a/services/detection/detection.py b/services/detection/detection.py index 46b5d3d..5a87a7e 100644 --- a/services/detection/detection.py +++ b/services/detection/detection.py @@ -1,12 +1,12 @@ """ -detector.py — YOLOv8/v9 frame-level object detection. +detection.py – YOLOv8/v9 frame-level object detection. Usage (CLI): - python detector.py --source data/sample_videos/sample.mp4 - python detector.py --source 0 # webcam + python detection.py --source data/sample_videos/sample.mp4 + python detection.py --source 0 # webcam Usage (API): - from services.detection.detector import Detector + from services.detection.detection import Detector detector = Detector() results = detector.detect(frame) """ @@ -48,13 +48,20 @@ class DetectionFrame: logger = logging.getLogger(__name__) -# ─── Detector Class ────────────────────────────────────────────────────────── - class Detector: - """Wraps a YOLO model for frame-by-frame inference.""" + """YOLOv8/v9 wrapper for frame-level object detection. + + Runs inference on individual BGR frames and returns structured + DetectionFrameSchema objects with bounding boxes, labels, confidence + scores, and zone memberships. - PERSON_CLASS_ID = 0 # COCO class ID for 'person' - TARGET_LABELS = { # labels to pass downstream (filter noise) + Attributes: + PERSON_CLASS_ID: YOLO class index for 'person'. + TARGET_LABELS: Set of object labels to retain from YOLO output. + """ + + PERSON_CLASS_ID = 0 + TARGET_LABELS = { "person", "backpack", "handbag", "cell phone", "laptop" } @@ -73,16 +80,19 @@ def __init__( self.conf = confidence_threshold self.device = device - def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame: - """ - Run YOLO inference on a single BGR frame. + def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrameSchema: + """Run YOLO inference on a single BGR frame. Args: - frame: BGR image as numpy array (H, W, 3). + frame: BGR image as numpy array (H, W, 3). frame_id: Frame index for downstream tracking. Returns: - DetectionFrame with all detected objects and zone memberships. + DetectionFrameSchema with all detected objects and zone memberships. + + Example: + detector = Detector() + det_frame = detector.detect(frame, frame_id=42) """ results = self.model(frame, device=self.device, verbose=False) detections: list[Detection] = [] @@ -108,43 +118,48 @@ def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame: x1, y1, x2, y2 = box.tolist() cx, cy = (x1 + x2) / 2, (y1 + y2) / 2 - zones = [z.name for z in get_zones_for_point(cx, cy, zones=active_zones)] + _ = [z.name for z in get_zones_for_point(cx, cy)] - detections.append(Detection( + detections.append(DetectionSchema( label=label, bbox=BoundingBox(x1=x1, y1=y1, x2=x2, y2=y2), confidence=float(conf), - center=(cx, cy), - zones_present=zones, + class_id=int(cls_id), )) - return DetectionFrame( + return DetectionFrameSchema( frame_id=frame_id, detections=detections, timestamp_ms=cv2.getTickCount() / cv2.getTickFrequency() * 1000, ) -# ─── Rendering ──────────────────────────────────────────────────────────────── - LABEL_COLORS: dict[str, tuple[int, int, int]] = { - "person": (0, 120, 255), - "backpack": (255, 165, 0), - "handbag": (255, 165, 0), - "cell phone":(0, 200, 200), - "laptop": (200, 0, 200), + "person": (0, 120, 255), + "backpack": (255, 165, 0), + "handbag": (255, 165, 0), + "cell phone": (0, 200, 200), + "laptop": (200, 0, 200), } -def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray: - """Draw bounding boxes, labels, and zone overlays onto frame.""" - out = frame.copy() - active_zones = get_zones() +def draw_detections(frame: np.ndarray, det_frame: DetectionFrameSchema) -> np.ndarray: + """Draw bounding boxes, labels, and zone overlays onto a BGR frame. + + Args: + frame: Original BGR image as numpy array (H, W, 3). + det_frame: DetectionFrameSchema containing all detected objects. - # Draw zone polygons - for zone in active_zones: - if not getattr(zone, 'valid', True): - continue + Returns: + Annotated BGR frame with boxes, labels, zones, and HUD overlay. + + Example: + annotated = draw_detections(frame, det_frame) + cv2.imshow("Output", annotated) + """ + out = frame.copy() + + for zone in DEFAULT_ZONES: pts = zone.as_array().reshape((-1, 1, 2)) overlay = out.copy() cv2.fillPoly(overlay, [pts], zone.color_bgr) @@ -153,32 +168,41 @@ def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray: cv2.putText(out, zone.name, zone.polygon[0], cv2.FONT_HERSHEY_SIMPLEX, 0.5, zone.color_bgr, 1) - # Draw detections for det in det_frame.detections: x1, y1, x2, y2 = int(det.bbox.x1), int(det.bbox.y1), int(det.bbox.x2), int(det.bbox.y2) + cx, cy = det.bbox.center color = LABEL_COLORS.get(det.label, (200, 200, 200)) cv2.rectangle(out, (x1, y1), (x2, y2), color, 2) label_text = f"{det.label} {det.confidence:.2f}" - if det.zones_present: - label_text += f" [{', '.join(det.zones_present)}]" cv2.putText(out, label_text, (x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2) - # Centroid dot - cv2.circle(out, (int(det.center[0]), int(det.center[1])), 4, color, -1) + cv2.circle(out, (int(cx), int(cy)), 4, color, -1) - # HUD - cv2.putText(out, f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}", - (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (255, 255, 255), 2) + cv2.putText( + out, + f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}", + (10, 28), + cv2.FONT_HERSHEY_SIMPLEX, + 0.65, + (255, 255, 255), + 2, + ) return out -# ─── CLI Entry Point ───────────────────────────────────────────────────────── - def main() -> None: + """CLI entry point for running the detection demo on video or webcam. + + Parses arguments, initializes the Detector, and runs the inference loop. + Optionally writes annotated output to a video file. + + Example: + python detection.py --source data/sample_videos/sample.mp4 --output out.mp4 + """ parser = argparse.ArgumentParser(description="Run Agentic Vision detection demo") parser.add_argument("--source", default="0", help="Video file path or camera index") parser.add_argument("--model", default=settings.detector_model, help="YOLO model name") @@ -194,7 +218,7 @@ def main() -> None: raise RuntimeError(f"Cannot open source: {source}") fps = cap.get(cv2.CAP_PROP_FPS) or 30 - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) logger.info(f"Stream: {width}x{height} @ {fps:.1f} FPS") @@ -210,21 +234,9 @@ def main() -> None: break det_frame = detector.detect(frame, frame_id=frame_id) - builder = SceneGraphBuilder(det_frame) - - builder.build_graph() - graph_text = builder.serialize_graph() - - if frame_id % 30 == 0 and graph_text: - prompt = build_reasoning_prompt(graph_text) - print("\nLLM PROMPT:\n") - print(prompt) - - - - annotated = draw_detections(frame, det_frame) + annotated = draw_detections(frame, det_frame) - cv2.imshow("Agentic Vision — Detection", annotated) + cv2.imshow("Agentic Vision – Detection", annotated) if writer: writer.write(annotated) diff --git a/services/memory/memory.py b/services/memory/memory.py index ee681df..e98aee9 100644 --- a/services/memory/memory.py +++ b/services/memory/memory.py @@ -227,71 +227,27 @@ def _handle_dead(self, event: TrackLifecycleEvent) -> None: @staticmethod def _track_key(camera_id: str, track_id: int) -> str: - """ - Generate Redis key for storing per-track state. - - Args: - camera_id (str): Camera identifier. - track_id (int): Track identifier. - - Returns: - str: Redis key in format track:{camera_id}:{track_id} - """ - + """Return the Redis key for a per-track state blob.""" return f"track:{camera_id}:{track_id}" @staticmethod def _event_key(camera_id: str, frame_id: int) -> str: - """ - Generate Redis key for storing per-frame lifecycle events. - - Args: - camera_id (str): Camera identifier. - frame_id (int): Frame number. - - Returns: - str: Redis key in format event:{camera_id}:{frame_id} - """ - + """Return the Redis key for a per-frame event list.""" return f"event:{camera_id}:{frame_id}" def _load_record(self, camera_id: str, track_id: int) -> Optional[dict]: - """ - Load a track record from Redis and convert it to a Python dictionary. - - This method retrieves stored tracking information for a given - camera_id and track_id combination. - - Args: - camera_id (str): Camera identifier. - track_id (int): Unique tracking ID. - - Returns: - Optional[dict]: Track record if found, otherwise None. - """ - + """Load and deserialise a track record from Redis, or return None.""" raw = self._r.get(self._track_key(camera_id, track_id)) return json.loads(raw) if raw else None def _update_record(self, event: TrackLifecycleEvent, state: str) -> None: """ - Update an existing track record in Redis with new lifecycle state. - - This updates: - - Track state (LOST / DEAD / ACTIVE) - - Last seen frame - - Last seen timestamp - - Dwell time - - Zones visited + Update an existing track record's state and timing fields in Redis. Args: - event (TrackLifecycleEvent): Lifecycle event containing update data. - state (str): New state to assign to the track. - - Returns: - None + event: Source lifecycle event supplying updated field values. + state: New state string (e.g. 'LOST', 'DEAD'). """ - record = self._load_record(event.camera_id, event.track_id) or {} record.update( { @@ -314,22 +270,12 @@ def _append_event( global_id: Optional[str], ) -> None: """ - Append a lifecycle event to Redis event history. - - Stores per-frame event logs including: - - Event type (BORN / LOST / DEAD) - - Track ID - - Global ID (if available) - - Timestamp and metadata + Append a lifecycle event dict to the per-frame Redis event log. Args: - event (TrackLifecycleEvent): Source lifecycle event. - global_id (Optional[str]): Global identity assigned to track. - - Returns: - None + event: Source lifecycle event. + global_id: Assigned global identity string, or None. """ - key = self._event_key(event.camera_id, event.frame_id) raw = self._r.get(key) evts: list[dict] = json.loads(raw) if raw else [] diff --git a/services/tracking/tracker.py b/services/tracking/tracker.py index fa38f89..20f0114 100644 --- a/services/tracking/tracker.py +++ b/services/tracking/tracker.py @@ -1,5 +1,5 @@ """ -tracker.py — Wraps deep-sort-realtime (ByteTrack-style) to assign persistent +tracker.py – Wraps deep-sort-realtime (ByteTrack-style) to assign persistent track IDs to YOLO detections coming from Phase 1. Usage (standalone): @@ -23,10 +23,9 @@ import numpy as np from deep_sort_realtime.deepsort_tracker import DeepSort -# ── adjust sys.path so we can import sibling packages ────────────────────── import sys -sys.path.insert(0, str(Path(__file__).resolve().parents[2])) # repo root +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from libs.schemas.detection import DetectionFrameSchema from libs.schemas.tracking import ( @@ -59,20 +58,30 @@ class Tracker: - Lifecycle event emission (BORN / LOST / DEAD) """ - MAX_TRAJECTORY_LEN = 80 # max trajectory points stored per track + MAX_TRAJECTORY_LEN = 80 FPS_DEFAULT = 30 def __init__( self, fps: float = FPS_DEFAULT, - max_age: int = 30, # frames before a lost track is marked DEAD - n_init: int = 3, # frames before a track is CONFIRMED + max_age: int = 30, + n_init: int = 3, max_cosine_distance: float = 0.4, camera_id: str = "cam_01", event_logger: TrackEventLogger | None = None, reid_similarity_threshold: float = 0.85, max_interpolation_gap: int = 10, # Added with a sensible default ) -> None: + """Initialize the Tracker with DeepSort backend and internal state. + + Args: + fps: Frames per second of the input stream. + max_age: Frames before a lost track is marked DEAD. + n_init: Frames before a track is CONFIRMED. + max_cosine_distance: ReID appearance distance threshold. + camera_id: Identifier for the camera feed. + event_logger: Optional logger for lifecycle events. + reid_similarity_threshold: Cosine similarity cutoff for ReID matching. """Initialize the tracker with DeepSort hyperparameters and interpolation constraints. Args: @@ -87,7 +96,7 @@ def __init__( """ self.fps = fps self.camera_id = camera_id - self.max_age = max_age # NEW + self.max_age = max_age self.REID_SIMILARITY_THRESHOLD = reid_similarity_threshold self.max_interpolation_gap = max_interpolation_gap # Fixed missing attribute @@ -97,7 +106,6 @@ def __init__( max_cosine_distance=max_cosine_distance, nn_budget=100, ) - # Internal state self._active_tracks: dict[int, TrackedObject] = {} self._known_ids: set[int] = set() self._frame_id: int = 0 @@ -106,8 +114,6 @@ def __init__( self._lost_embeddings: dict[int, dict] = {} self._active_embeddings: dict[int, np.ndarray] = {} - # ── Public API ────────────────────────────────────────────────────────── - def update( self, det_frame: DetectionFrameSchema, @@ -118,7 +124,7 @@ def update( Args: det_frame: Output of Phase 1 detector (DetectionFrameSchema). - raw_frame: Original BGR frame — needed for appearance features. + raw_frame: Original BGR frame – needed for appearance features. Returns: TrackedFrame with all confirmed tracks, dwell times, trajectories. @@ -126,21 +132,17 @@ def update( self._frame_id = det_frame.frame_id frames_processed_total.inc() - # ── Convert Pydantic detections → DeepSort input format ─────────── - # DeepSort expects: list of ([left, top, w, h], confidence, label) ds_input = [] for det in det_frame.detections: - if det.label != "person": # track persons only in this phase + if det.label != "person": continue b = det.bbox left, top = b.x1, b.y1 w, h = b.x2 - b.x1, b.y2 - b.y1 ds_input.append(([left, top, w, h], float(det.confidence), "person")) - # ── Run tracker ──────────────────────────────────────────────────── raw_tracks = self._tracker.update_tracks(ds_input, frame=raw_frame) - # ── Build TrackedObject list ─────────────────────────────────────── current_ids: set[int] = set() tracked_objects: list[TrackedObject] = [] @@ -150,7 +152,6 @@ def update( tid = int(t.track_id) - # ── ReID matching ───────────────────────────────────── if hasattr(t, "features") and t.features: new_embedding = t.features[-1] self._active_embeddings[tid] = new_embedding @@ -181,7 +182,6 @@ def update( zones = [z.name for z in get_zones_for_point(cx, cy)] - # ── Lifecycle: BORN ─────────────────────────────────────────── if tid not in self._known_ids: self._known_ids.add(tid) self._emit_lifecycle(TrackState.BORN, tid, zones, 0.0) @@ -203,6 +203,7 @@ def update( dwell_secs = dwell_frames / self.fps + prev_traj = prev.trajectory if prev else [] # ── Trajectory ──────────────────────────────────────────────── interpolated_points = [] max_gap = self.max_interpolation_gap # <-- Replaced self.config string access @@ -265,7 +266,6 @@ def update( for obj in tracked_objects: track_dwell_seconds.observe(obj.dwell_time_seconds) - # ── Lifecycle: LOST for tracks that disappeared ──────────────────── for tid, prev_obj in list(self._active_tracks.items()): if tid not in current_ids: frames_since = self._frame_id - prev_obj.last_seen_frame @@ -322,16 +322,23 @@ def update( ) def drain_lifecycle_events(self) -> list[TrackLifecycleEvent]: - """ - Pop and return all pending lifecycle events since last call. - Called by the memory service to store BORN/LOST/DEAD events. + """Pop and return all pending lifecycle events since the last call. + + Called by the memory service to consume BORN/LOST/DEAD events. + + Returns: + List of TrackLifecycleEvent objects queued since the last drain. + Returns an empty list if no events are pending. + + Example: + events = tracker.drain_lifecycle_events() + for evt in events: + memory_service.store(evt) """ events = list(self._lifecycle_queue) self._lifecycle_queue.clear() return events - # ── Internal ──────────────────────────────────────────────────────────── - def _emit_lifecycle( self, state: TrackState, @@ -339,6 +346,14 @@ def _emit_lifecycle( zones: list[str], dwell_secs: float, ) -> None: + """Create and queue a TrackLifecycleEvent, optionally logging it. + + Args: + state: TrackState enum value (BORN, LOST, or DEAD). + track_id: Unique integer ID of the track. + zones: List of zone names the track currently occupies. + dwell_secs: Total dwell time in seconds for this track. + """ event = TrackLifecycleEvent( event=state, track_id=track_id, @@ -357,6 +372,15 @@ def _cosine_similarity( a: np.ndarray, b: np.ndarray, ) -> float: + """Compute cosine similarity between two embedding vectors. + + Args: + a: First embedding vector as numpy array. + b: Second embedding vector as numpy array. + + Returns: + Float in [0, 1] representing similarity; 0.0 if either norm is zero. + """ norm_product = np.linalg.norm(a) * np.linalg.norm(b) if norm_product == 0: return 0.0 @@ -364,17 +388,19 @@ def _cosine_similarity( return float(np.dot(a, b) / norm_product) -# ─── CLI Demo ──────────────────────────────────────────────────────────────── - - def main() -> None: + """CLI entry point for the tracking demo on video or webcam. + + Parses arguments, initializes Detector and Tracker, runs the pipeline, + and optionally writes annotated output to a video file. + """ import sys sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from services.detection.detector import Detector from services.tracking.visualizer import draw_tracks - parser = argparse.ArgumentParser(description="Phase 2 — Tracking demo") + parser = argparse.ArgumentParser(description="Phase 2 – Tracking demo") parser.add_argument("--source", default="0") parser.add_argument("--model", default=settings.detector_model, help="YOLO model name") parser.add_argument("--output", default=None) @@ -403,14 +429,13 @@ def main() -> None: tracked_frame = tracker.update(det_frame, frame) annotated = draw_tracks(frame, tracked_frame) - # Drain lifecycle events (Phase 3 will store these in Redis) for evt in tracker.drain_lifecycle_events(): logger.info( f"Lifecycle: {evt.event} track #{evt.track_id} " f"dwell={evt.dwell_time_seconds:.1f}s zones={evt.zones_present}" ) - cv2.imshow("Agentic Vision — Tracking", annotated) + cv2.imshow("Agentic Vision – Tracking", annotated) if writer: writer.write(annotated) if cv2.waitKey(1) & 0xFF == ord("q"):