diff --git a/services/memory/memory.py b/services/memory/memory.py index 1f1bf73..33cf579 100644 --- a/services/memory/memory.py +++ b/services/memory/memory.py @@ -1,428 +1,390 @@ -""" -memory.py — Persistent memory layer for Eagle surveillance. - -Stores lifecycle events (BORN / LOST / DEAD) in Redis and orchestrates -cross-camera ReID so every track gets a ``global_id`` alongside its local -``track_id``. - -Redis key schema (extended for global IDs) ------------------------------------------- -- ``track:{camera_id}:{track_id}`` → JSON blob ← per-track state -- ``event:{camera_id}:{frame_id}`` → JSON list ← lifecycle history -- ``embed:{camera_id}:{track_id}`` → JSON blob ← ReID embedding (TTL 7 s) -- ``identity:{global_id}`` → JSON list ← cross-cam tokens (TTL 1 h) - -Usage ------ - import redis - from services.memory.memory import MemoryService - from services.tracking.cross_camera_reid import CrossCameraReID - - r = redis.Redis() - reid = CrossCameraReID(r) - mem = MemoryService(r, reid) - - # In your tracking loop: - for event in tracker.drain_lifecycle_events(): - global_id = mem.handle_lifecycle_event(event, embedding=vec) -""" - -from __future__ import annotations - -import json -import logging -from typing import Optional - -import numpy as np - -from libs.observability.metrics import redis_write_latency -from libs.schemas.tracking import TrackLifecycleEvent, TrackState -from libs.schemas.memory import TrackEvent, TrackSequence, ActionHint -from services.tracking.cross_camera_reid import CrossCameraReID - -logger = logging.getLogger(__name__) - -# ── Redis TTLs ──────────────────────────────────────────────────────────────── -TRACK_TTL_SECONDS = 86_400 # 24 h — keep per-track state for a full day -EVENT_TTL_SECONDS = 86_400 - - -class MemoryService: - """ - Writes tracking lifecycle events to Redis and assigns global identities - via ReID. - - Parameters - ---------- - redis_client: - Connected ``redis.Redis`` (or FakeRedis for tests). - reid: - ``CrossCameraReID`` instance sharing the same Redis client. - """ - - def __init__(self, redis_client, reid: CrossCameraReID) -> None: - """ - Initialise MemoryService with a Redis client and ReID engine. - - Args: - redis_client: Connected redis.Redis or FakeRedis instance. - reid: CrossCameraReID instance for global ID assignment. - """ - self._r = redis_client - self._reid = reid - - # ── Public API ──────────────────────────────────────────────────────────── - - def handle_lifecycle_event( - self, - event: TrackLifecycleEvent, - embedding: Optional[np.ndarray] = None, - ) -> Optional[str]: - """ - Process a single lifecycle event and return the assigned global_id. - - - BORN → attempt ReID match; mint or reuse a global_id; store track record. - - LOST → store embedding for future cross-camera matching; update record. - - DEAD → mark track as dead; update record. - - Args: - event: TrackLifecycleEvent from Tracker.drain_lifecycle_events(). - embedding: Appearance feature vector. Required for BORN/LOST events - if cross-camera ReID is desired; may be None in tests. - - Returns: - The global_id string if one was assigned, else None. - """ - global_id: Optional[str] = None - - if event.event == TrackState.BORN: - global_id = self._handle_born(event, embedding) - - elif event.event == TrackState.LOST: - global_id = self._handle_lost(event, embedding) - - elif event.event == TrackState.DEAD: - self._handle_dead(event) - - # Always append the raw event to the event log - self._append_event(event, global_id) - return global_id - - def get_track_record(self, camera_id: str, track_id: int) -> Optional[dict]: - """Retrieve the stored track record, or None if not found.""" - raw = self._r.get(self._track_key(camera_id, track_id)) - return json.loads(raw) if raw else None - - def get_identity(self, global_id: str) -> list[str]: - """Proxy to CrossCameraReID.get_identity.""" - return self._reid.get_identity(global_id) - - # ── Event handlers ──────────────────────────────────────────────────────── - - def _handle_born( - self, - event: TrackLifecycleEvent, - embedding: Optional[np.ndarray], - ) -> str: - """ - Handle a BORN lifecycle event. - - Attempts ReID match to reuse an existing global_id; mints a new - UUID if no embedding is available. - - Args: - event: The BORN TrackLifecycleEvent. - embedding: Appearance feature vector, or None. - - Returns: - Assigned global_id string. - """ - if embedding is not None: - reid_result = self._reid.match_or_create( - camera_id=event.camera_id, - track_id=event.track_id, - embedding=embedding, - ) - global_id = reid_result.global_id - else: - # No embedding available → mint a placeholder global_id - import uuid - - global_id = str(uuid.uuid4()) - logger.warning( - "BORN event for cam=%s track=%d has no embedding; " - "cross-camera ReID disabled for this track.", - event.camera_id, - event.track_id, - ) - - record = { - "camera_id": event.camera_id, - "track_id": event.track_id, - "global_id": global_id, - "state": TrackState.ACTIVE.value, - "born_frame": event.frame_id, - "born_timestamp_ms": event.timestamp_ms, - "last_seen_frame": event.frame_id, - "last_seen_ms": event.timestamp_ms, - "dwell_time_seconds": event.dwell_time_seconds, - "zones_present": event.zones_present, - } - with redis_write_latency.time(): - self._r.setex( - self._track_key(event.camera_id, event.track_id), - TRACK_TTL_SECONDS, - json.dumps(record), - ) - logger.info("BORN cam=%s track=%d gid=%s", event.camera_id, event.track_id, global_id) - return global_id - - def _handle_lost( - self, - event: TrackLifecycleEvent, - embedding: Optional[np.ndarray], - ) -> Optional[str]: - """ - Handle a LOST lifecycle event. - - Stores the appearance embedding so another camera can match against - it within the ReID TTL window. - - Args: - event: The LOST TrackLifecycleEvent. - embedding: Appearance feature vector, or None. - - Returns: - The existing global_id if found, else None. - """ - record = self._load_record(event.camera_id, event.track_id) - global_id = record.get("global_id") if record else None - - # Store embedding so another camera can match against it within 5 s - if embedding is not None: - self._reid.store_embedding( - camera_id=event.camera_id, - track_id=event.track_id, - embedding=embedding, - global_id=global_id, - ) - - self._update_record(event, TrackState.LOST.value) - logger.info("LOST cam=%s track=%d gid=%s", event.camera_id, event.track_id, global_id) - return global_id - - def _handle_dead(self, event: TrackLifecycleEvent) -> None: - """ - Handle a DEAD lifecycle event. - - Marks the track record as DEAD in Redis. - - Args: - event: The DEAD TrackLifecycleEvent. - """ - self._update_record(event, TrackState.DEAD.value) - logger.info("DEAD cam=%s track=%d", event.camera_id, event.track_id) - - # ── Redis helpers ───────────────────────────────────────────────────────── - - @staticmethod - def _track_key(camera_id: str, track_id: int) -> str: - """Return the Redis key for a per-track state blob.""" - return f"track:{camera_id}:{track_id}" - - @staticmethod - def _event_key(camera_id: str, frame_id: int) -> str: - """Return the Redis key for a per-frame event list.""" - return f"event:{camera_id}:{frame_id}" - - def _load_record(self, camera_id: str, track_id: int) -> Optional[dict]: - """Load and deserialise a track record from Redis, or return None.""" - raw = self._r.get(self._track_key(camera_id, track_id)) - return json.loads(raw) if raw else None - - def _update_record(self, event: TrackLifecycleEvent, state: str) -> None: - """ - Update an existing track record's state and timing fields in Redis. - - Args: - event: Source lifecycle event supplying updated field values. - state: New state string (e.g. 'LOST', 'DEAD'). - """ - record = self._load_record(event.camera_id, event.track_id) or {} - record.update( - { - "state": state, - "last_seen_frame": event.frame_id, - "last_seen_ms": event.timestamp_ms, - "dwell_time_seconds": event.dwell_time_seconds, - "zones_present": event.zones_present, - } - ) - self._r.setex( - self._track_key(event.camera_id, event.track_id), - TRACK_TTL_SECONDS, - json.dumps(record), - ) - - def _append_event( - self, - event: TrackLifecycleEvent, - global_id: Optional[str], - ) -> None: - """ - Append a lifecycle event dict to the per-frame Redis event log. - - Args: - event: Source lifecycle event. - global_id: Assigned global identity string, or None. - """ - key = self._event_key(event.camera_id, event.frame_id) - raw = self._r.get(key) - evts: list[dict] = json.loads(raw) if raw else [] - evts.append( - { - "event": event.event.value, - "track_id": event.track_id, - "global_id": global_id, - "frame_id": event.frame_id, - "timestamp_ms": event.timestamp_ms, - "dwell_time_seconds": event.dwell_time_seconds, - "zones_present": event.zones_present, - } - ) - with redis_write_latency.time(): - self._r.setex( - key, - EVENT_TTL_SECONDS, - json.dumps(evts), - ) - - -# Compatibility layer: lightweight event store used by tests and the pipeline. -MAX_EVENTS_PER_TRACK = 50 - - -class MemoryStore: - """Simple Redis-backed ring buffer for TrackEvent objects. - - This is intentionally minimal: it stores JSON-serialised events in a - Redis list (oldest -> newest), trims to `MAX_EVENTS_PER_TRACK`, and - exposes the methods used by unit tests and the pipeline. - """ - - def __init__(self, redis_client=None, prefix: str = "mem") -> None: - """ - Initialise MemoryStore. - - Args: - redis_client: Connected redis.Redis instance, or None to create one. - prefix: Key prefix used for all Redis keys (default: 'mem'). - """ - import redis - - self._r = redis_client or redis.Redis() - self._prefix = prefix - - def _events_key(self, track_id: int) -> str: - """Return the Redis list key for a track's event history.""" - return f"{self._prefix}:events:{track_id}" - - def _active_key(self, camera_id: str) -> str: - """Return the Redis set key for active track IDs on a camera.""" - return f"{self._prefix}:active:{camera_id}" - - def _track_camera_key(self, track_id: int) -> str: - """Return the Redis key mapping a track_id to its camera_id.""" - return f"{self._prefix}:track_camera:{track_id}" - - def store_event(self, evt: TrackEvent) -> None: - """ - Persist a TrackEvent to Redis and maintain the active-tracks set. - - Trims the event list to MAX_EVENTS_PER_TRACK after each write. - - Args: - evt: TrackEvent instance to store. - """ - key = self._events_key(evt.track_id) - payload = evt.model_dump() if hasattr(evt, "model_dump") else evt.dict() - self._r.rpush(key, json.dumps(payload)) - self._r.ltrim(key, -MAX_EVENTS_PER_TRACK, -1) - self._r.sadd(self._active_key(evt.camera_id), str(evt.track_id)) - self._r.set(self._track_camera_key(evt.track_id), evt.camera_id) - self._r.expire(key, TRACK_TTL_SECONDS) - - def get_sequence(self, track_id: int, last_n: Optional[int] = None) -> TrackSequence: - """ - Retrieve the event sequence for a track from Redis. - - Args: - track_id: Integer track identifier. - last_n: If given, return only the most recent N events. - - Returns: - TrackSequence containing the requested events. - """ - key = self._events_key(track_id) - raw = self._r.lrange(key, 0, -1) - events: list[TrackEvent] = [] - for item in raw: - data = json.loads(item) - events.append(TrackEvent(**data)) - if last_n is not None: - events = events[-last_n:] - zones = list(dict.fromkeys(e.zone for e in events if e.zone is not None)) - return TrackSequence(track_id=track_id, events=events, zones_visited=zones) - - def get_zone_entry_count(self, track_id: int, zone: str) -> int: - """ - Count how many times a track entered a specific zone. - - Args: - track_id: Integer track identifier. - zone: Zone name string to filter on. - - Returns: - Integer count of ZONE_ENTRY events for the given zone. - """ - seq = self.get_sequence(track_id) - return sum( - 1 for e in seq.events - if e.zone == zone and e.action_hint == ActionHint.ZONE_ENTRY - ) - - def get_active_track_ids(self, camera_id: str) -> set[int]: - """ - Return the set of currently active track IDs for a camera. - - Args: - camera_id: Camera identifier string. - - Returns: - Set of integer track IDs active on that camera. - """ - members = self._r.smembers(self._active_key(camera_id)) - result: set[int] = set() - for m in members: - try: - result.add(int(m)) - except Exception: - continue - return result - - def expire_track(self, track_id: int) -> None: - """ - Remove all Redis state for a track and drop it from the active set. - - Args: - track_id: Integer track identifier to expire. - """ - cam = self._r.get(self._track_camera_key(track_id)) - if cam: - try: - cam = cam if isinstance(cam, str) else cam.decode() - except Exception: - pass - self._r.srem(self._active_key(cam), str(track_id)) - self._r.delete(self._events_key(track_id)) - self._r.delete(self._track_camera_key(track_id)) +""" +memory.py — Persistent memory layer for Eagle surveillance. + +Stores lifecycle events (BORN / LOST / DEAD) in Redis and orchestrates +cross-camera ReID so every track gets a ``global_id`` alongside its local +``track_id``. + +Redis key schema (extended for global IDs) +------------------------------------------ +- ``track:{camera_id}:{track_id}`` → JSON blob ← per-track state +- ``event:{camera_id}:{frame_id}`` → JSON list ← lifecycle history +- ``embed:{camera_id}:{track_id}`` → JSON blob ← ReID embedding (TTL 7 s) +- ``identity:{global_id}`` → JSON list ← cross-cam tokens (TTL 1 h) + +Usage +----- + import redis + from services.memory.memory import MemoryService + from services.tracking.cross_camera_reid import CrossCameraReID + + r = redis.Redis() + reid = CrossCameraReID(r) + mem = MemoryService(r, reid) + + # In your tracking loop: + for event in tracker.drain_lifecycle_events(): + global_id = mem.handle_lifecycle_event(event, embedding=vec) +""" + +from __future__ import annotations + +import json +import logging +from typing import Optional + +import numpy as np + +from libs.observability.metrics import redis_write_latency +from libs.schemas.memory import ( + TrackEvent, + TrackSequence, +) +from libs.schemas.tracking import TrackLifecycleEvent, TrackState +from services.tracking.cross_camera_reid import CrossCameraReID +from services.memory.baseline import ZoneBaseline + +logger = logging.getLogger(__name__) + +# ── Redis TTLs ──────────────────────────────────────────────────────────────── +TRACK_TTL_SECONDS = 86_400 # 24 h — keep per-track state for a full day +EVENT_TTL_SECONDS = 86_400 + +# ── MemoryStore constants ───────────────────────────────────────────────────── +MAX_EVENTS_PER_TRACK = 50 # ring-buffer cap per track_id + + +class MemoryService: + """ + Writes tracking lifecycle events to Redis and assigns global identities + via ReID. + + Parameters + ---------- + redis_client: + Connected ``redis.Redis`` (or FakeRedis for tests). + reid: + ``CrossCameraReID`` instance sharing the same Redis client. + """ + + def __init__(self, redis_client, reid: CrossCameraReID) -> None: + self._r = redis_client + self._reid = reid + + # ── Public API ──────────────────────────────────────────────────────────── + + def handle_lifecycle_event( + self, + event: TrackLifecycleEvent, + embedding: Optional[np.ndarray] = None, + ) -> Optional[str]: + """ + Process a single lifecycle event and return the assigned global_id. + + - BORN → attempt ReID match; mint or reuse a global_id; store track record. + - LOST → store embedding for future cross-camera matching; update record. + - DEAD → mark track as dead; update record. + + Args: + event: TrackLifecycleEvent from Tracker.drain_lifecycle_events(). + embedding: Appearance feature vector. Required for BORN/LOST events + if cross-camera ReID is desired; may be None in tests. + + Returns: + The global_id string if one was assigned, else None. + """ + global_id: Optional[str] = None + zone_anomalous: bool = False + + if event.event == TrackState.BORN: + global_id = self._handle_born(event, embedding) + + elif event.event == TrackState.LOST: + global_id, zone_anomalous = self._handle_lost(event, embedding) + + elif event.event == TrackState.DEAD: + self._handle_dead(event) + + # Always append the raw event to the event log + self._append_event(event, global_id, zone_anomalous) + return global_id + + def get_track_record(self, camera_id: str, track_id: int) -> Optional[dict]: + """Retrieve the stored track record, or None if not found.""" + raw = self._r.get(self._track_key(camera_id, track_id)) + return json.loads(raw) if raw else None + + def get_identity(self, global_id: str) -> list[str]: + """Proxy to CrossCameraReID.get_identity.""" + return self._reid.get_identity(global_id) + + # ── Event handlers ──────────────────────────────────────────────────────── + + def _handle_born( + self, + event: TrackLifecycleEvent, + embedding: Optional[np.ndarray], + ) -> str: + if embedding is not None: + reid_result = self._reid.match_or_create( + camera_id=event.camera_id, + track_id=event.track_id, + embedding=embedding, + ) + global_id = reid_result.global_id + else: + # No embedding available → mint a placeholder global_id + import uuid + + global_id = str(uuid.uuid4()) + logger.warning( + "BORN event for cam=%s track=%d has no embedding; " + "cross-camera ReID disabled for this track.", + event.camera_id, + event.track_id, + ) + + record = { + "camera_id": event.camera_id, + "track_id": event.track_id, + "global_id": global_id, + "state": TrackState.ACTIVE.value, + "born_frame": event.frame_id, + "born_timestamp_ms": event.timestamp_ms, + "last_seen_frame": event.frame_id, + "last_seen_ms": event.timestamp_ms, + "dwell_time_seconds": event.dwell_time_seconds, + "zones_present": event.zones_present, + } + with redis_write_latency.time(): + self._r.setex( + self._track_key(event.camera_id, event.track_id), + TRACK_TTL_SECONDS, + json.dumps(record), + ) + logger.info("BORN cam=%s track=%d gid=%s", event.camera_id, event.track_id, global_id) + return global_id + + def _handle_lost( + self, + event: TrackLifecycleEvent, + embedding: Optional[np.ndarray], + ) -> tuple[Optional[str], bool]: + record = self._load_record(event.camera_id, event.track_id) + global_id = record.get("global_id") if record else None + + # Store embedding so another camera can match against it within 5 s + if embedding is not None: + self._reid.store_embedding( + camera_id=event.camera_id, + track_id=event.track_id, + embedding=embedding, + global_id=global_id, + ) + + # Detect anomaly BEFORE updating baseline (avoid contaminating with outlier) + # then update baseline for each zone this track visited + zone_anomalous = False + for zone in event.zones_present: + baseline = ZoneBaseline(self._r, zone) + if baseline.is_anomalous(event.dwell_time_seconds): + zone_anomalous = True + baseline.update(event.dwell_time_seconds) + + self._update_record(event, TrackState.LOST.value, zone_anomalous) + logger.info("LOST cam=%s track=%d gid=%s anomalous=%s", + event.camera_id, event.track_id, global_id, zone_anomalous) + return global_id, zone_anomalous + + def _handle_dead(self, event: TrackLifecycleEvent) -> None: + self._update_record(event, TrackState.DEAD.value) + logger.info("DEAD cam=%s track=%d", event.camera_id, event.track_id) + + # ── Redis helpers ───────────────────────────────────────────────────────── + + @staticmethod + def _track_key(camera_id: str, track_id: int) -> str: + return f"track:{camera_id}:{track_id}" + + @staticmethod + def _event_key(camera_id: str, frame_id: int) -> str: + return f"event:{camera_id}:{frame_id}" + + def _load_record(self, camera_id: str, track_id: int) -> Optional[dict]: + raw = self._r.get(self._track_key(camera_id, track_id)) + return json.loads(raw) if raw else None + + def _update_record(self, event: TrackLifecycleEvent, state: str, anomalous: bool = False) -> None: + record = self._load_record(event.camera_id, event.track_id) or {} + record.update( + { + "state": state, + "last_seen_frame": event.frame_id, + "last_seen_ms": event.timestamp_ms, + "dwell_time_seconds": event.dwell_time_seconds, + "zones_present": event.zones_present, + "anomalous": anomalous, + } + ) + self._r.setex( + self._track_key(event.camera_id, event.track_id), + TRACK_TTL_SECONDS, + json.dumps(record), + ) + + def _append_event( + self, + event: TrackLifecycleEvent, + global_id: Optional[str], + anomalous: bool = False, + ) -> None: + key = self._event_key(event.camera_id, event.frame_id) + raw = self._r.get(key) + evts: list[dict] = json.loads(raw) if raw else [] + evts.append( + { + "event": event.event.value, + "track_id": event.track_id, + "global_id": global_id, + "frame_id": event.frame_id, + "timestamp_ms": event.timestamp_ms, + "dwell_time_seconds": event.dwell_time_seconds, + "zones_present": event.zones_present, + "anomalous": anomalous, + } + ) + with redis_write_latency.time(): + self._r.setex( + key, + EVENT_TTL_SECONDS, + json.dumps(evts), + ) + + +# ── MemoryStore ─────────────────────────────────────────────────────────────── + +class MemoryStore: + """ + Lightweight ring-buffer event store for per-track behavioural sequences. + + Stores ``TrackEvent`` objects (Phase 3 schema) in Redis lists capped at + ``MAX_EVENTS_PER_TRACK`` entries. Designed for the action-classifier → + VLM/LLM reasoning pipeline. + + Redis key schema + ---------------- + - ``seq:{camera_id}:{track_id}`` → JSON list of TrackEvent dicts + - ``zones:{camera_id}:{track_id}`` → Redis set of zone names visited + - ``zone_count:{camera_id}:{track_id}:{zone}`` → integer entry count + - ``active:{camera_id}`` → Redis set of active track_ids + + Parameters + ---------- + redis_client: + Connected ``redis.Redis`` (or FakeRedis for tests). + camera_id: + Default camera identifier used when none is supplied per-event. + """ + + def __init__(self, redis_client, camera_id: str = "cam_01") -> None: + self._r = redis_client + self._camera_id = camera_id + + # ── Key helpers ─────────────────────────────────────────────────────────── + + def _seq_key(self, track_id: int) -> str: + return f"seq:{self._camera_id}:{track_id}" + + def _zones_key(self, track_id: int) -> str: + return f"zones:{self._camera_id}:{track_id}" + + def _zone_count_key(self, track_id: int, zone: str) -> str: + return f"zone_count:{self._camera_id}:{track_id}:{zone}" + + def _active_key(self) -> str: + return f"active:{self._camera_id}" + + def store_event(self, event) -> None: + """ + Append a ``TrackEvent`` to the ring buffer for its track. + + Enforces the ``MAX_EVENTS_PER_TRACK`` cap by trimming the oldest + entry whenever the list exceeds the limit. Also maintains the + zones-visited set, per-zone entry counts, and the active-tracks set. + + Args: + event: ``TrackEvent`` instance (from ``libs.schemas.memory``). + """ + + key = self._seq_key(event.track_id) + serialised = event.model_dump_json() + + pipe = self._r.pipeline() + pipe.rpush(key, serialised) + pipe.ltrim(key, -MAX_EVENTS_PER_TRACK, -1) + pipe.sadd(self._active_key(), str(event.track_id)) + + if event.zone: + pipe.sadd(self._zones_key(event.track_id), event.zone) + if event.action_hint == ActionHint.ZONE_ENTRY: + pipe.incr(self._zone_count_key(event.track_id, event.zone)) + + pipe.execute() + + def get_sequence(self, track_id: int, last_n: Optional[int] = None, camera_id: Optional[str] = None): + """ + Return a ``TrackSequence`` for the given track. + + Args: + track_id: Track identifier. + last_n: If given, return only the most recent *n* events. + + Returns: + ``TrackSequence`` (empty if the track has no stored events). + """ + from libs.schemas.memory import TrackEvent + + key = self._seq_key(track_id) + raw_list = self._r.lrange(key, -last_n, -1) if last_n else self._r.lrange(key, 0, -1) + + def get_active_track_ids(self, camera_id: str) -> set[int]: + members = self._r.smembers(self._active_key(camera_id)) + result: set[int] = set() + for m in members: + try: + data = json.loads(raw if isinstance(raw, str) else raw.decode()) + events.append(TrackEvent(**data)) + except Exception: + continue + + zones_raw = self._r.smembers(self._zones_key(track_id)) + zones_visited = [z if isinstance(z, str) else z.decode() for z in zones_raw] + total_dwell = sum(e.dwell_time_seconds for e in events) + + return TrackSequence( + track_id=track_id, + camera_id=self._camera_id, + events=events, + zones_visited=zones_visited, + total_dwell=total_dwell, + ) + + def get_zone_entry_count(self, track_id: int, zone: str, camera_id: Optional[str] = None) -> int: + """Return the number of times *track_id* has entered *zone*.""" + raw = self._r.get(self._zone_count_key(track_id, zone)) + if raw is None: + return 0 + return int(raw if isinstance(raw, (int, str)) else raw.decode()) + + def get_active_track_ids(self, camera_id: str) -> set[int]: + """Return the set of track IDs currently marked active for *camera_id*.""" + members = self._r.smembers(f"active:{camera_id}") + return {int(m if isinstance(m, (int, str)) else m.decode()) for m in members} + + def expire_track(self, track_id: int, camera_id: Optional[str] = None) -> None: + """Remove all stored data for *track_id* and deregister it as active.""" + pipe = self._r.pipeline() + pipe.delete(self._seq_key(track_id)) + pipe.delete(self._zones_key(track_id)) + pipe.srem(self._active_key(), str(track_id)) + pipe.execute() diff --git a/services/tracking/tracker.py b/services/tracking/tracker.py index ffd6af6..d7fd6e7 100644 --- a/services/tracking/tracker.py +++ b/services/tracking/tracker.py @@ -177,68 +177,47 @@ def update( y1 = float(ltwh[1]) x2 = x1 + float(ltwh[2]) y2 = y1 + float(ltwh[3]) - cx, cy = (x1 + x2) / 2, (y1 + y2) / 2 - zones = [z.name for z in get_zones_for_point(cx, cy)] + cx = (x1 + x2) / 2 + cy = (y1 + y2) / 2 + + matched_zones = get_zones_for_point(cx, cy) + + ZONE_PRIORITY = { + "keypad_area": 2, + "restricted_door": 1, + } + + matched_zones.sort( + key=lambda z: ZONE_PRIORITY.get(z.name, 0), + reverse=True, + ) + + zones = [z.name for z in matched_zones] - # ── Lifecycle: BORN ─────────────────────────────────────────── if tid not in self._known_ids: self._known_ids.add(tid) - self._emit_lifecycle(TrackState.BORN, tid, zones, 0.0) - logger.info(f"Track BORN: #{tid} in zones={zones}") - # ── Base Setup & Gap Calculation ────────────────────────────── + self._emit_lifecycle( + TrackState.BORN, + tid, + zones, + 0.0, + ) + + logger.info( + f"Track BORN: #{tid} in zones={zones}" + ) + prev = self._active_tracks.get(tid) - prev_traj = prev.trajectory if prev else [] - - # Compute gap_frames early so both Dwell Time and Trajectory can use it - gap_frames = max(0, self._frame_id - prev.last_seen_frame - 1) if prev is not None else 0 - - # ── Dwell time ──────────────────────────────────────────────── - if prev: - # Add historic frames, the current frame, and the occlusion gap - dwell_frames = prev.dwell_time_frames + 1 + gap_frames - else: - dwell_frames = 1 - + + dwell_frames = ( + prev.dwell_time_frames + 1 + ) if prev else 1 + dwell_secs = dwell_frames / self.fps - # ── Trajectory ──────────────────────────────────────────────── - interpolated_points = [] - max_gap = self.max_interpolation_gap # <-- Replaced self.config string access - - if prev is not None and 0 < gap_frames <= max_gap: - # Added guard condition below to prevent IndexError crashes - if prev.trajectory: - last_pos = {"x": prev.trajectory[-1].x, "y": prev.trajectory[-1].y} - else: - last_pos = {"x": cx, "y": cy} # Fallback to current center coordinates - - new_pos = {"x": cx, "y": cy} - - # Check if previous data contains w and h bounding box metrics - if hasattr(prev, 'bbox') and len(prev.bbox) == 4: - # Calculate old width and height from bbox: [x1, y1, x2, y2] - last_pos["w"] = prev.bbox[2] - prev.bbox[0] - last_pos["h"] = prev.bbox[3] - prev.bbox[1] - # Current width and height - new_pos["w"] = x2 - x1 - new_pos["h"] = y2 - y1 - - # Synthesize intermediate points and wrap them into TrajectoryPoint instances - interpolated_points = [ - TrajectoryPoint( - x=p["x"], - y=p["y"], - frame_id=p["frame_id"], - interpolated=True, - w=p.get("w"), - h=p.get("h") - ) - for p in _interpolate_trajectory(last_pos, new_pos, gap_frames, prev.last_seen_frame + 1) - ] - - # Generate the current frame real point + prev_traj = prev.trajectory if prev else [] new_point = TrajectoryPoint(x=cx, y=cy, frame_id=self._frame_id) # Merge old history, calculated mid-gap points, and current point cleanly @@ -257,21 +236,30 @@ def update( zones_present=zones, last_seen_frame=self._frame_id, ) + self._active_tracks[tid] = obj - current_ids.add(tid) tracked_objects.append(obj) + current_ids.add(tid) + active_tracks.set(len(tracked_objects)) for obj in tracked_objects: track_dwell_seconds.observe(obj.dwell_time_seconds) # ── Lifecycle: LOST for tracks that disappeared ──────────────────── for tid, prev_obj in list(self._active_tracks.items()): + if tid not in current_ids: frames_since = self._frame_id - prev_obj.last_seen_frame track = None if frames_since == 1: - track = next((t for t in raw_tracks if int(t.track_id) == tid), None) + track = next( + ( + t for t in raw_tracks + if int(t.track_id) == tid + ), + None, + ) embedding = None if frames_since == 1: