Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 71 additions & 59 deletions services/detection/detection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""
detector.py YOLOv8/v9 frame-level object detection.
detection.py YOLOv8/v9 frame-level object detection.

Usage (CLI):
python detector.py --source data/sample_videos/sample.mp4
python detector.py --source 0 # webcam
python detection.py --source data/sample_videos/sample.mp4
python detection.py --source 0 # webcam

Usage (API):
from services.detection.detector import Detector
from services.detection.detection import Detector
detector = Detector()
results = detector.detect(frame)
"""
Expand Down Expand Up @@ -48,13 +48,20 @@ class DetectionFrame:
logger = logging.getLogger(__name__)


# ─── Detector Class ──────────────────────────────────────────────────────────

class Detector:
"""Wraps a YOLO model for frame-by-frame inference."""
"""YOLOv8/v9 wrapper for frame-level object detection.

Runs inference on individual BGR frames and returns structured
DetectionFrameSchema objects with bounding boxes, labels, confidence
scores, and zone memberships.

PERSON_CLASS_ID = 0 # COCO class ID for 'person'
TARGET_LABELS = { # labels to pass downstream (filter noise)
Attributes:
PERSON_CLASS_ID: YOLO class index for 'person'.
TARGET_LABELS: Set of object labels to retain from YOLO output.
"""

PERSON_CLASS_ID = 0
TARGET_LABELS = {
"person", "backpack", "handbag", "cell phone", "laptop"
}

Expand All @@ -73,16 +80,19 @@ def __init__(
self.conf = confidence_threshold
self.device = device

def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame:
"""
Run YOLO inference on a single BGR frame.
def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrameSchema:
"""Run YOLO inference on a single BGR frame.

Args:
frame: BGR image as numpy array (H, W, 3).
frame: BGR image as numpy array (H, W, 3).
frame_id: Frame index for downstream tracking.

Returns:
DetectionFrame with all detected objects and zone memberships.
DetectionFrameSchema with all detected objects and zone memberships.

Example:
detector = Detector()
det_frame = detector.detect(frame, frame_id=42)
"""
results = self.model(frame, device=self.device, verbose=False)
detections: list[Detection] = []
Expand All @@ -108,43 +118,48 @@ def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame:
x1, y1, x2, y2 = box.tolist()
cx, cy = (x1 + x2) / 2, (y1 + y2) / 2

zones = [z.name for z in get_zones_for_point(cx, cy, zones=active_zones)]
_ = [z.name for z in get_zones_for_point(cx, cy)]

detections.append(Detection(
detections.append(DetectionSchema(
label=label,
bbox=BoundingBox(x1=x1, y1=y1, x2=x2, y2=y2),
confidence=float(conf),
center=(cx, cy),
zones_present=zones,
class_id=int(cls_id),
))

return DetectionFrame(
return DetectionFrameSchema(
frame_id=frame_id,
detections=detections,
timestamp_ms=cv2.getTickCount() / cv2.getTickFrequency() * 1000,
)


# ─── Rendering ────────────────────────────────────────────────────────────────

LABEL_COLORS: dict[str, tuple[int, int, int]] = {
"person": (0, 120, 255),
"backpack": (255, 165, 0),
"handbag": (255, 165, 0),
"cell phone":(0, 200, 200),
"laptop": (200, 0, 200),
"person": (0, 120, 255),
"backpack": (255, 165, 0),
"handbag": (255, 165, 0),
"cell phone": (0, 200, 200),
"laptop": (200, 0, 200),
}

def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray:
"""Draw bounding boxes, labels, and zone overlays onto frame."""
out = frame.copy()

active_zones = get_zones()
def draw_detections(frame: np.ndarray, det_frame: DetectionFrameSchema) -> np.ndarray:
"""Draw bounding boxes, labels, and zone overlays onto a BGR frame.

Args:
frame: Original BGR image as numpy array (H, W, 3).
det_frame: DetectionFrameSchema containing all detected objects.

# Draw zone polygons
for zone in active_zones:
if not getattr(zone, 'valid', True):
continue
Returns:
Annotated BGR frame with boxes, labels, zones, and HUD overlay.

Example:
annotated = draw_detections(frame, det_frame)
cv2.imshow("Output", annotated)
"""
out = frame.copy()

for zone in DEFAULT_ZONES:
pts = zone.as_array().reshape((-1, 1, 2))
overlay = out.copy()
cv2.fillPoly(overlay, [pts], zone.color_bgr)
Expand All @@ -153,32 +168,41 @@ def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray:
cv2.putText(out, zone.name, zone.polygon[0],
cv2.FONT_HERSHEY_SIMPLEX, 0.5, zone.color_bgr, 1)

# Draw detections
for det in det_frame.detections:
x1, y1, x2, y2 = int(det.bbox.x1), int(det.bbox.y1), int(det.bbox.x2), int(det.bbox.y2)
cx, cy = det.bbox.center
color = LABEL_COLORS.get(det.label, (200, 200, 200))
cv2.rectangle(out, (x1, y1), (x2, y2), color, 2)

label_text = f"{det.label} {det.confidence:.2f}"
if det.zones_present:
label_text += f" [{', '.join(det.zones_present)}]"

cv2.putText(out, label_text, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2)

# Centroid dot
cv2.circle(out, (int(det.center[0]), int(det.center[1])), 4, color, -1)
cv2.circle(out, (int(cx), int(cy)), 4, color, -1)

# HUD
cv2.putText(out, f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}",
(10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (255, 255, 255), 2)
cv2.putText(
out,
f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}",
(10, 28),
cv2.FONT_HERSHEY_SIMPLEX,
0.65,
(255, 255, 255),
2,
)

return out


# ─── CLI Entry Point ─────────────────────────────────────────────────────────

def main() -> None:
"""CLI entry point for running the detection demo on video or webcam.

Parses arguments, initializes the Detector, and runs the inference loop.
Optionally writes annotated output to a video file.

Example:
python detection.py --source data/sample_videos/sample.mp4 --output out.mp4
"""
parser = argparse.ArgumentParser(description="Run Agentic Vision detection demo")
parser.add_argument("--source", default="0", help="Video file path or camera index")
parser.add_argument("--model", default=settings.detector_model, help="YOLO model name")
Expand All @@ -194,7 +218,7 @@ def main() -> None:
raise RuntimeError(f"Cannot open source: {source}")

fps = cap.get(cv2.CAP_PROP_FPS) or 30
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
logger.info(f"Stream: {width}x{height} @ {fps:.1f} FPS")

Expand All @@ -210,21 +234,9 @@ def main() -> None:
break

det_frame = detector.detect(frame, frame_id=frame_id)
builder = SceneGraphBuilder(det_frame)

builder.build_graph()
graph_text = builder.serialize_graph()

if frame_id % 30 == 0 and graph_text:
prompt = build_reasoning_prompt(graph_text)
print("\nLLM PROMPT:\n")
print(prompt)



annotated = draw_detections(frame, det_frame)
annotated = draw_detections(frame, det_frame)

cv2.imshow("Agentic Vision Detection", annotated)
cv2.imshow("Agentic Vision Detection", annotated)
if writer:
writer.write(annotated)

Expand Down
72 changes: 9 additions & 63 deletions services/memory/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,71 +227,27 @@ def _handle_dead(self, event: TrackLifecycleEvent) -> None:

@staticmethod
def _track_key(camera_id: str, track_id: int) -> str:
"""
Generate Redis key for storing per-track state.

Args:
camera_id (str): Camera identifier.
track_id (int): Track identifier.

Returns:
str: Redis key in format track:{camera_id}:{track_id}
"""

"""Return the Redis key for a per-track state blob."""
return f"track:{camera_id}:{track_id}"

@staticmethod
def _event_key(camera_id: str, frame_id: int) -> str:
"""
Generate Redis key for storing per-frame lifecycle events.

Args:
camera_id (str): Camera identifier.
frame_id (int): Frame number.

Returns:
str: Redis key in format event:{camera_id}:{frame_id}
"""

"""Return the Redis key for a per-frame event list."""
return f"event:{camera_id}:{frame_id}"

def _load_record(self, camera_id: str, track_id: int) -> Optional[dict]:
"""
Load a track record from Redis and convert it to a Python dictionary.

This method retrieves stored tracking information for a given
camera_id and track_id combination.

Args:
camera_id (str): Camera identifier.
track_id (int): Unique tracking ID.

Returns:
Optional[dict]: Track record if found, otherwise None.
"""

"""Load and deserialise a track record from Redis, or return None."""
raw = self._r.get(self._track_key(camera_id, track_id))
return json.loads(raw) if raw else None

def _update_record(self, event: TrackLifecycleEvent, state: str) -> None:
"""
Update an existing track record in Redis with new lifecycle state.

This updates:
- Track state (LOST / DEAD / ACTIVE)
- Last seen frame
- Last seen timestamp
- Dwell time
- Zones visited
Update an existing track record's state and timing fields in Redis.

Args:
event (TrackLifecycleEvent): Lifecycle event containing update data.
state (str): New state to assign to the track.

Returns:
None
event: Source lifecycle event supplying updated field values.
state: New state string (e.g. 'LOST', 'DEAD').
"""

record = self._load_record(event.camera_id, event.track_id) or {}
record.update(
{
Expand All @@ -314,22 +270,12 @@ def _append_event(
global_id: Optional[str],
) -> None:
"""
Append a lifecycle event to Redis event history.

Stores per-frame event logs including:
- Event type (BORN / LOST / DEAD)
- Track ID
- Global ID (if available)
- Timestamp and metadata
Append a lifecycle event dict to the per-frame Redis event log.

Args:
event (TrackLifecycleEvent): Source lifecycle event.
global_id (Optional[str]): Global identity assigned to track.

Returns:
None
event: Source lifecycle event.
global_id: Assigned global identity string, or None.
"""

key = self._event_key(event.camera_id, event.frame_id)
raw = self._r.get(key)
evts: list[dict] = json.loads(raw) if raw else []
Expand Down
Loading
Loading