Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2106c47
fix(video-status): changed port from 8081 to 8085 to avoid port confl…
Vchen7629 Apr 13, 2026
c9780b7
refactor(transcoder-worker): extracted kv related logic into handler …
Vchen7629 Apr 14, 2026
11cc1b6
feat(transcoder-worker): updated subscriber.go to update the job stat…
Vchen7629 Apr 14, 2026
94b98f6
refactor(transcoder-worker): extracted structured logger into observa…
Vchen7629 Apr 14, 2026
c741f10
fix(video-upload): change the kv creation to connect since video-stat…
Vchen7629 Apr 14, 2026
aacd2b9
feat(video-recombiner): added job_status_kv.go so it can update the j…
Vchen7629 Apr 14, 2026
90e10e8
feat(video-recombiner): added http service for health checks to check…
Vchen7629 Apr 14, 2026
423e965
feat(scene-detector): updated subscriber.py to write the job status k…
Vchen7629 Apr 14, 2026
c7fc438
feat(scene-detector): created http_server.py to initialize an health …
Vchen7629 Apr 14, 2026
30c5047
chore(scene-detector): updated job.py import
Vchen7629 Apr 14, 2026
0919422
feat(scene-detector): updated service.py to register the health endpo…
Vchen7629 Apr 14, 2026
e969ac4
feat(video-recombiner): updated subscriber.go to update the job statu…
Vchen7629 Apr 14, 2026
a468dd3
feat(video-status): added http health checks on each service and adde…
Vchen7629 Apr 14, 2026
29be71f
fix(video-status): added missing err checks
Vchen7629 Apr 14, 2026
d857ec8
tests(video-status): created/updated unit/integration tests for the h…
Vchen7629 Apr 14, 2026
202a449
tests(video-status): created/updated unit/integration tests for the h…
Vchen7629 Apr 14, 2026
9e65058
Merge branch 'phase-4' of https://github.com/Vchen7629/Splice into ph…
Vchen7629 Apr 14, 2026
82a868e
refactor(transcoder-worker): moved startHttpServer to handler package…
Vchen7629 Apr 14, 2026
bbdfa8a
fix(transcoder-worker): changed logger name to be correct
Vchen7629 Apr 14, 2026
0d40e06
fix(transcoder-worker): added missing return to ConnectJobStatusKV an…
Vchen7629 Apr 14, 2026
c3cc080
tests(transcoder-worker): added additional unit/integration tests
Vchen7629 Apr 14, 2026
6c26d4a
refactor(video-recombiner): moved http server start and shutdown to h…
Vchen7629 Apr 15, 2026
1c90422
tests(video-recombiner): updated unit/integration tests to test updat…
Vchen7629 Apr 15, 2026
343e477
tests(scene-detector): updated unit/integration tests to work with up…
Vchen7629 Apr 15, 2026
81dc3e0
feat(frontend): updated video upload list to show the progress bar wi…
Vchen7629 Apr 15, 2026
359aae6
fix(video-upload): wrong error msg in video.go
Vchen7629 Apr 15, 2026
2f3e391
tests(video-upload): removed old test verifying old behavior
Vchen7629 Apr 15, 2026
285ea82
fix(video-upload): lint errors
Vchen7629 Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/scene-detector/src/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Settings(BaseSettings):
# general config
LOG_LEVEL: str = "DEBUG"
LOG_FORMAT: str = "json"
HTTP_PORT: int = 9098

# Nats config
NATS_URL: str = "nats://localhost:4222"
Expand Down
25 changes: 25 additions & 0 deletions backend/scene-detector/src/handler/http_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from http.server import HTTPServer
from http.server import BaseHTTPRequestHandler
import threading
import json


class HealthEnpointHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
if self.path == "/health":
body = json.dumps({"status": "Healthy"}).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(body)
else:
self.send_response(404)
self.end_headers()


def start_health_server(port: int) -> HTTPServer:
server = HTTPServer(("", port), HealthEnpointHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()

return server
70 changes: 70 additions & 0 deletions backend/scene-detector/src/handler/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from nats.js.errors import KeyNotFoundError
from nats.js.kv import KeyValue
from nats.js.api import ConsumerConfig
from nats.aio.msg import Msg
from ..core.logging import logger
from ..core.settings import settings
from ..processing.job import process_job
from .messages import SceneSplitMessage
from .publisher import scene_video_chunks
from nats.js.client import JetStreamContext
import json


async def raw_videos(
js: JetStreamContext, msg_processed_kv: KeyValue, job_status_kv: KeyValue
) -> None:
"""Nats jetstream consumer that subscribes to subject to process videos"""
sub = await js.subscribe(
subject=settings.SCENE_SPLIT_SUBJECT,
durable=settings.NATS_SUB_QUEUE_NAME,
queue=settings.NATS_SUB_QUEUE_NAME,
config=ConsumerConfig(
max_deliver=settings.MAX_DELIVER_ATTEMPTS, ack_wait=settings.ACK_WAIT_S
),
)

async for msg in sub.messages:
await _process_msg(js, msg_processed_kv, job_status_kv, msg)


async def _process_msg(
js: JetStreamContext, msg_processed_kv: KeyValue, job_status_kv: KeyValue, msg: Msg
) -> None:
"""Processes a single scene-split message"""
try:
metadata = SceneSplitMessage.model_validate_json(msg.data.decode())

if await _is_already_processed(msg_processed_kv, metadata.job_id):
logger.debug("job already processed, skipping", job_id=metadata.job_id)
await msg.ack()
return

await _update_job_status(job_status_kv, metadata.job_id)

chunk_messages = await process_job(metadata)

await scene_video_chunks(js, chunk_messages)
await msg_processed_kv.put(metadata.job_id, b"done")
await msg.ack()
except Exception as e:
logger.error("unexpected error processing job", err=str(e))
await msg.nak()


async def _is_already_processed(kv: KeyValue, job_id: str) -> bool:
"""Checks if the job_id exists in the scene-split-processed so it doesnt reprocess"""
try:
await kv.get(job_id)
return True
except KeyNotFoundError:
return False


async def _update_job_status(job_status_kv: KeyValue, job_id: str) -> None:
"""Writes PROCESSING:scene-detector stage to the job-status KV bucket"""
try:
status = json.dumps({"state": "PROCESSING", "stage": "scene-detector"}).encode()
await job_status_kv.put(job_id, status)
except Exception as e:
logger.error("failed to update job status stage", job_id=job_id, err=str(e))
45 changes: 0 additions & 45 deletions backend/scene-detector/src/nats/subscriber.py

This file was deleted.

4 changes: 2 additions & 2 deletions backend/scene-detector/src/processing/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from ..storage.queries import fetch_video
from ..storage.queries import upload_video_chunks
from .video import split_into_chunks
from ..nats.messages import SceneSplitMessage
from ..nats.messages import VideoChunkMessage
from ..handler.messages import SceneSplitMessage
from ..handler.messages import VideoChunkMessage
from scenedetect import VideoOpenFailure
import asyncio
import shutil
Expand Down
18 changes: 14 additions & 4 deletions backend/scene-detector/src/service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from src.handler.http_server import start_health_server
from nats.js.api import KeyValueConfig
from .nats.subscriber import raw_videos
from .nats.connection import nats_connect
from .handler.subscriber import raw_videos
from .handler.connection import nats_connect
from .storage.check_health import check_storage_health
from .core.logging import logger
from .core.settings import settings
Expand All @@ -11,6 +12,7 @@
async def start_service() -> None:
"""Start the python scene-detection service"""
check_storage_health()
health_server = start_health_server(settings.HTTP_PORT)

nc, js = await nats_connect()

Expand All @@ -29,7 +31,7 @@ async def start_service() -> None:
)

try:
kv = await js.create_key_value(
msg_processed_kv = await js.create_key_value(
config=KeyValueConfig(
bucket="scene-split-processed",
description="key value bucket for scene detector to check if the job_id already processed for idempotency",
Expand All @@ -40,8 +42,16 @@ async def start_service() -> None:
raise RuntimeError(f"failed to create scene-split-processed KV bucket: {e}")

try:
await raw_videos(js, kv)
job_status_kv = await js.key_value("job-status")
except js_errors.NotFoundError:
raise RuntimeError(
"job-status KV bucket not found, check video-status is running"
)

try:
await raw_videos(js, msg_processed_kv, job_status_kv)
finally:
health_server.shutdown()
await nc.drain()


Expand Down
1 change: 1 addition & 0 deletions backend/scene-detector/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
"tests.fixtures.helpers",
"tests.fixtures.nats",
"tests.fixtures.storage",
"tests.fixtures.kv",
]
57 changes: 56 additions & 1 deletion backend/scene-detector/tests/fixtures/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from src.storage import queries
from typing import Any
from typing import AsyncGenerator
from pathlib import Path
from nats.js import JetStreamContext
from unittest.mock import patch
from src.handler.http_server import start_health_server
from src.storage import queries
import socket
import pytest
import pytest_asyncio


@pytest.fixture(autouse=True)
Expand All @@ -9,6 +16,20 @@ def patch_temp_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(queries, "TEMP_DIR", str(tmp_path))


@pytest_asyncio.fixture
async def patched_start_service(
js_context: tuple[Any, JetStreamContext],
) -> AsyncGenerator[tuple[Any, JetStreamContext], None]:
"""Yields (nc, js) with check_storage_health, start_health_server, and nats_connect patched"""
nc, js = js_context
with (
patch("src.service.check_storage_health"),
patch("src.service.start_health_server"),
patch("src.service.nats_connect", return_value=(nc, js)),
):
yield nc, js


@pytest.fixture
def chunk_files(tmp_path: Path) -> list[str]:
"""Creates a set of fake .mp4 chunk files in tmp_path"""
Expand All @@ -18,3 +39,37 @@ def chunk_files(tmp_path: Path) -> list[str]:
chunk.write_bytes(b"fake chunk content")
chunks.append(str(chunk))
return chunks


@pytest.fixture
def single_video_chunk(tmp_path: Path) -> str:
chunk = tmp_path / "chunk.mp4"
chunk.write_bytes(b"data")
return str(chunk)


def _free_port() -> int:
with socket.socket() as s:
s.bind(("", 0))
return s.getsockname()[1]


@pytest.fixture
def live_http_server() -> Any:
port = _free_port()
server = start_health_server(port)
yield f"http://localhost:{port}"
server.shutdown()


@pytest.fixture
def spy_drain(js_context: tuple[Any, JetStreamContext]) -> tuple[Any, list[bool]]:
"""Replaces nc.drain with a no-op spy (whatever that means)"""
nc, _ = js_context
called: list[bool] = []

async def _spy() -> None:
called.append(True)

nc.drain = _spy
return nc, called
11 changes: 11 additions & 0 deletions backend/scene-detector/tests/fixtures/kv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from nats.js.errors import KeyNotFoundError
from nats.js.kv import KeyValue
from unittest.mock import AsyncMock
import pytest


@pytest.fixture
def mock_kv() -> AsyncMock:
kv = AsyncMock(spec=KeyValue)
kv.get.side_effect = KeyNotFoundError()
return kv
17 changes: 16 additions & 1 deletion backend/scene-detector/tests/fixtures/nats.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from unittest.mock import AsyncMock
from unittest.mock import MagicMock
from typing import Any
from typing import Generator
from typing import AsyncGenerator
from nats.js import JetStreamContext
from nats.js.api import KeyValueConfig
from nats.aio.msg import Msg
from testcontainers.nats import NatsContainer
from src.core.settings import settings
Expand Down Expand Up @@ -32,6 +35,7 @@ async def js_context(
name="videos",
subjects=[settings.SCENE_SPLIT_SUBJECT, settings.VIDEO_CHUNKS_SUBJECT],
)
await js.create_key_value(config=KeyValueConfig(bucket="job-status"))
yield nc, js
await nc.close()

Expand All @@ -41,7 +45,7 @@ async def nats_video_chunks_subscriber(
js_context: tuple[Any, JetStreamContext], monkeypatch: Any
) -> AsyncGenerator[list[Any], None]:
monkeypatch.setattr(
"src.nats.publisher.settings.VIDEO_CHUNKS_SUBJECT",
"src.handler.publisher.settings.VIDEO_CHUNKS_SUBJECT",
settings.VIDEO_CHUNKS_SUBJECT,
)
nc, js = js_context
Expand All @@ -53,3 +57,14 @@ async def handler(msg: Msg) -> None:
sub = await nc.subscribe(settings.VIDEO_CHUNKS_SUBJECT, cb=handler)
yield received
await sub.unsubscribe()


@pytest.fixture
def mock_nats() -> tuple[MagicMock, MagicMock]:
mock_js = MagicMock()
mock_js.find_stream_name_by_subject = AsyncMock()
mock_js.create_key_value = AsyncMock()
mock_js.key_value = AsyncMock()
mock_nc = MagicMock()
mock_nc.drain = AsyncMock()
return mock_nc, mock_js
16 changes: 16 additions & 0 deletions backend/scene-detector/tests/integration/test_http_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json
import urllib.request
import urllib.error
import pytest


def test_health_endpoint_returns_200(live_http_server: str) -> None:
with urllib.request.urlopen(f"{live_http_server}/health") as resp:
assert resp.status == 200
assert json.loads(resp.read()) == {"status": "Healthy"}


def test_unknown_path_returns_404(live_http_server: str) -> None:
with pytest.raises(urllib.error.HTTPError) as exc_info:
urllib.request.urlopen(f"{live_http_server}/not-found")
assert exc_info.value.code == 404
4 changes: 2 additions & 2 deletions backend/scene-detector/tests/integration/test_nats_connect.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from typing import Any
from nats.aio.client import Client as NATSClient
from nats.js.client import JetStreamContext
from src.nats.connection import nats_connect
from src.handler.connection import nats_connect
import pytest


@pytest.mark.asyncio
async def test_connect_returns_connected_clients(
nats_url: str, monkeypatch: Any
) -> None:
monkeypatch.setattr("src.nats.connection.settings.NATS_URL", nats_url)
monkeypatch.setattr("src.handler.connection.settings.NATS_URL", nats_url)

nc, js = await nats_connect()

Expand Down
4 changes: 2 additions & 2 deletions backend/scene-detector/tests/integration/test_publisher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any
from nats.js.client import JetStreamContext
from src.nats.messages import VideoChunkMessage
from src.nats.publisher import scene_video_chunks
from src.handler.messages import VideoChunkMessage
from src.handler.publisher import scene_video_chunks
import pytest


Expand Down
Loading
Loading