From 51bd021912f4b337024cc5ed687647ecb2484ebc Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 18:27:30 -0700 Subject: [PATCH 1/5] fix(transcoder-worker): added missing nak for get unprocessed video chunk in subscriber.go --- backend/transcoder-worker/internal/handler/subscriber.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/backend/transcoder-worker/internal/handler/subscriber.go b/backend/transcoder-worker/internal/handler/subscriber.go index 1055bb6..33dd28a 100644 --- a/backend/transcoder-worker/internal/handler/subscriber.go +++ b/backend/transcoder-worker/internal/handler/subscriber.go @@ -81,6 +81,11 @@ func ConsumeVideoChunk( filePath, err := storage.GetUnprocessedVideoChunk(payload.StorageURL, payload.JobID) if err != nil { logger.Error("error fetching unprocessed video chunk", "job_id", payload.JobID, "err", err) + err := msg.Nak() + if err != nil { + logger.Error("error naking msg for get unprocessed video chunk", "err", err) + return + } return } From d7dffb0117f60f492b01906f22ed7d8eeadd6162 Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 19:27:29 -0700 Subject: [PATCH 2/5] feat(video-recombiner): added kv for idempotency --- backend/video-recombiner/cmd/main.go | 38 +++++++++++------- backend/video-recombiner/cmd/makefile | 5 +-- .../internal/handler/subscriber.go | 39 ++++++++++++------- .../internal/observability/logging.go | 17 ++++++++ .../internal/service/chunk_kv.go | 39 +++++++++++++++++++ .../internal/service/cleanup.go | 21 ++++++++++ 6 files changed, 129 insertions(+), 30 deletions(-) create mode 100644 backend/video-recombiner/internal/observability/logging.go create mode 100644 backend/video-recombiner/internal/service/chunk_kv.go create mode 100644 backend/video-recombiner/internal/service/cleanup.go diff --git a/backend/video-recombiner/cmd/main.go b/backend/video-recombiner/cmd/main.go index dca07ca..0600d12 100644 --- a/backend/video-recombiner/cmd/main.go +++ b/backend/video-recombiner/cmd/main.go @@ -1,13 +1,16 @@ package main import ( + "context" "fmt" "log" "log/slog" "os" "os/signal" "syscall" + "time" "video-recombiner/internal/handler" + "video-recombiner/internal/observability" "video-recombiner/internal/storage" "github.com/joho/godotenv" @@ -30,7 +33,7 @@ func main() { log.Fatalf("failed to load config values: %v", err) } - logger := newLogger(cfg) + logger := observability.StructuredLogger(cfg.ProdMode) err = storage.CheckHealth(cfg.BaseStorageURL, logger) if err != nil { @@ -53,10 +56,20 @@ func main() { return } + kv, err := js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{ + Bucket: "recombine-chunk-recieved", + Description: "tracks video chunk for the jobID is already recieved for idempotency", + TTL: 3 * time.Hour, + }) + if err != nil { + logger.Error("failed to create recombine-chunk-recieved kv bucket", "err", err) + osExit(1) + } + quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) - err = runCombiner(js, nc, logger, cfg.BaseStorageURL, quit) + err = runCombiner(js, nc, kv, logger, cfg.BaseStorageURL, quit) if err != nil { logger.Error("error flushing remaining msgs", "err", err) } @@ -66,10 +79,17 @@ type ncDrainer interface { Drain() error } -func runCombiner(js jetstream.JetStream, nc ncDrainer, logger *slog.Logger, baseStorageURL string, quit <-chan os.Signal) error { +func runCombiner( + js jetstream.JetStream, + nc ncDrainer, + kv jetstream.KeyValue, + logger *slog.Logger, + baseStorageURL string, + quit <-chan os.Signal, +) error { logger.Debug("starting service...") - consCtx, err := handler.RecombineVideo(js, logger, baseStorageURL) + consCtx, err := handler.RecombineVideo(js, kv, logger, baseStorageURL) if err != nil { return fmt.Errorf("failed to start subscriber/publisher: %w", err) } @@ -80,16 +100,6 @@ func runCombiner(js jetstream.JetStream, nc ncDrainer, logger *slog.Logger, base return nc.Drain() } -func newLogger(cfg *Config) *slog.Logger { - level := slog.LevelDebug - if cfg.ProdMode { - level = slog.LevelInfo - } - h := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level}) - - return slog.New(h).With("service", "video-recombiner") -} - func loadConfig() (*Config, error) { err := godotenv.Load("../.env") if err != nil { diff --git a/backend/video-recombiner/cmd/makefile b/backend/video-recombiner/cmd/makefile index 5cfb526..46db019 100644 --- a/backend/video-recombiner/cmd/makefile +++ b/backend/video-recombiner/cmd/makefile @@ -1,6 +1,6 @@ test_all: integration unit -PKGS := . ../internal/handler/... ../internal/service/... ../internal/storage/... +PKGS := . ../internal/handler/... ../internal/observability/... ../internal/service/... ../internal/storage/... format: go fmt ${PKGS} . @@ -17,6 +17,5 @@ integration: go test -tags integration ${PKGS} unit: - go test -tags unit . - go test -tags unit ../internal/handler/... + go test -tags unit ${PKGS} go test -race -tags unit ../internal/service/... diff --git a/backend/video-recombiner/internal/handler/subscriber.go b/backend/video-recombiner/internal/handler/subscriber.go index bc0e5f9..c348d4d 100644 --- a/backend/video-recombiner/internal/handler/subscriber.go +++ b/backend/video-recombiner/internal/handler/subscriber.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log/slog" - "os" "time" "video-recombiner/internal/service" "video-recombiner/internal/storage" @@ -15,10 +14,10 @@ import ( const subSubject = "jobs.chunks.complete" -var removeAll = os.RemoveAll - // recombines video chunks back into one video -func RecombineVideo(js jetstream.JetStream, logger *slog.Logger, baseStorageURL string) (jetstream.ConsumeContext, error) { +func RecombineVideo( + js jetstream.JetStream, kv jetstream.KeyValue, logger *slog.Logger, baseStorageURL string, +) (jetstream.ConsumeContext, error) { ctx := context.Background() streamName, err := js.StreamNameBySubject(ctx, subSubject) @@ -59,6 +58,22 @@ func RecombineVideo(js jetstream.JetStream, logger *slog.Logger, baseStorageURL return } + recieved, err := service.CheckChunkRecieved(kv, payload.JobID, payload.ChunkIndex) + if err != nil { + logger.Error("failed to check chunk recieved", "err", err) + return + } + + if recieved { + logger.Debug("message already recieved, skipping") + err := msg.Ack() + if err != nil { + logger.Error("error acking msg", "err", err) + return + } + return + } + ready, chunks := tracker.Add(payload.JobID, payload.ChunkIndex, payload.StorageURL, payload.TotalChunks) err = msg.Ack() @@ -67,6 +82,12 @@ func RecombineVideo(js jetstream.JetStream, logger *slog.Logger, baseStorageURL return } + err = service.AddChunkRecieved(kv, payload.JobID, payload.ChunkIndex) + if err != nil { + logger.Error("failed to mark job chunk as recieved", "err", err) + return + } + if ready { localChunks := make(map[int]string) failed := false @@ -96,15 +117,7 @@ func RecombineVideo(js jetstream.JetStream, logger *slog.Logger, baseStorageURL return } - err = removeAll("/tmp/processed_chunk-" + payload.JobID) - if err != nil { - logger.Warn("failed to clean up chunk temp dir", "job_id", payload.JobID, "err", err) - } - - err = removeAll("/tmp/jobs/" + payload.JobID) - if err != nil { - logger.Warn("failed to clean up job temp dir", "job_id", payload.JobID, "err", err) - } + service.CleanUpTempFolders(payload.JobID, logger) logger.Debug("job complete", "job_id", payload.JobID, "output_path", outputPath) err = PublishVideoProcessingComplete(js, service.VideoProcessingCompleteMessage{JobID: payload.JobID}) diff --git a/backend/video-recombiner/internal/observability/logging.go b/backend/video-recombiner/internal/observability/logging.go new file mode 100644 index 0000000..9948aba --- /dev/null +++ b/backend/video-recombiner/internal/observability/logging.go @@ -0,0 +1,17 @@ +package observability + +import ( + "log/slog" + "os" +) + +// General Structured logger for code +func StructuredLogger(prodMode bool) *slog.Logger { + level := slog.LevelDebug + if prodMode { + level = slog.LevelInfo + } + h := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level}) + + return slog.New(h).With("service", "video-recombiner") +} diff --git a/backend/video-recombiner/internal/service/chunk_kv.go b/backend/video-recombiner/internal/service/chunk_kv.go new file mode 100644 index 0000000..f3bbd5a --- /dev/null +++ b/backend/video-recombiner/internal/service/chunk_kv.go @@ -0,0 +1,39 @@ +package service + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +// check if a jobID chunk already is recieved, returns a bool based on if it exists in the KV +func CheckChunkRecieved(kv jetstream.KeyValue, jobID string, chunkIndex int) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err := kv.Get(ctx, fmt.Sprintf("%s.%d", jobID, chunkIndex)) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return false, nil + } + return false, fmt.Errorf("failed: %w", err) + } + + return true, nil +} + +// add a recieved job chunk to the KV for idempotency +func AddChunkRecieved(kv jetstream.KeyValue, jobID string, chunkIndex int) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err := kv.Put(ctx, fmt.Sprintf("%s.%d", jobID, chunkIndex), []byte("processed")) + if err != nil { + return fmt.Errorf("failed: %w", err) + } + + return nil +} diff --git a/backend/video-recombiner/internal/service/cleanup.go b/backend/video-recombiner/internal/service/cleanup.go new file mode 100644 index 0000000..316af79 --- /dev/null +++ b/backend/video-recombiner/internal/service/cleanup.go @@ -0,0 +1,21 @@ +package service + +import ( + "log/slog" + "os" +) + +var removeAll = os.RemoveAll + +// Remove the tmp folders for the jobID after processing is done +func CleanUpTempFolders(jobID string, logger *slog.Logger) { + err := removeAll("/tmp/processed_chunk-" + jobID) + if err != nil { + logger.Warn("failed to clean up chunk temp dir", "job_id", jobID, "err", err) + } + + err = removeAll("/tmp/jobs/" + jobID) + if err != nil { + logger.Warn("failed to clean up job temp dir", "job_id", jobID, "err", err) + } +} From 366e2526381a022468bf85f93bfbe083a5cfcd51 Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 19:30:32 -0700 Subject: [PATCH 3/5] tests(video-recombiner): added extra integration/unit tests for subscriber.go and main.go --- .../idempotency_integration_test.go | 44 +++++++++ backend/video-recombiner/cmd/helpers_test.go | 10 ++ .../cmd/main_integration_test.go | 28 ++++-- .../video-recombiner/cmd/main_unit_test.go | 32 +------ .../handler/subscriber_integration_test.go | 93 +++++++++++++++--- .../internal/handler/subscriber_unit_test.go | 94 +++++++++++++++++-- .../observability/logging_unit_test.go | 28 ++++++ .../internal/service/chunk_kv_unit_test.go | 91 ++++++++++++++++++ .../internal/service/cleanup_unit_test.go | 59 ++++++++++++ .../internal/test/jetstream_mocks.go | 31 ++++++ .../internal/test/nats_fixtures.go | 39 ++++++++ 11 files changed, 498 insertions(+), 51 deletions(-) create mode 100644 backend/video-recombiner/internal/observability/logging_unit_test.go create mode 100644 backend/video-recombiner/internal/service/chunk_kv_unit_test.go create mode 100644 backend/video-recombiner/internal/service/cleanup_unit_test.go diff --git a/backend/pipeline-tests/idempotency_integration_test.go b/backend/pipeline-tests/idempotency_integration_test.go index 192f8a9..85a3944 100644 --- a/backend/pipeline-tests/idempotency_integration_test.go +++ b/backend/pipeline-tests/idempotency_integration_test.go @@ -66,6 +66,50 @@ func TestTranscoderWorker(t *testing.T) { } func TestVideoRecombiner(t *testing.T) { + t.Run("duplicate chunk message in a multi-chunk job is dropped", func(t *testing.T) { + baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL) + + nc, err := nats.Connect(natsURL) + require.NoError(t, err) + t.Cleanup(nc.Close) + + js, err := jetstream.New(nc) + require.NoError(t, err) + + // Capture ChunkCompleteMessages so we can replay one after the job completes. + capturedChunkMsgs := make(chan []byte, 10) + captureSub, err := nc.Subscribe("jobs.chunks.complete", func(m *nats.Msg) { + capturedChunkMsgs <- m.Data + }) + require.NoError(t, err) + + // GenerateTestVideo produces a red→blue cut so scene-detector emits 2+ chunks. + videoPath := filepath.Join(t.TempDir(), "test.mp4") + helpers.GenerateTestVideo(t, videoPath) + + jobID := helpers.UploadVideo(t, baseURL, videoPath, "480p") + helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute) + require.NoError(t, captureSub.Unsubscribe()) + + secondComplete := make(chan struct{}, 1) + sub, err := nc.Subscribe("jobs.complete", func(_ *nats.Msg) { + secondComplete <- struct{}{} + }) + require.NoError(t, err) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + // Replay one captured chunk — idempotency should ack and drop it. + payload := <-capturedChunkMsgs + _, err = js.Publish(context.Background(), "jobs.chunks.complete", payload) + require.NoError(t, err) + + select { + case <-secondComplete: + t.Fatal("duplicate chunk message was not dropped: triggered a jobs.complete") + case <-time.After(5 * time.Second): + } + }) + t.Run("redelivered ChunkCompleteMessage does not trigger a second stitch", func(t *testing.T) { baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL) diff --git a/backend/video-recombiner/cmd/helpers_test.go b/backend/video-recombiner/cmd/helpers_test.go index dcb57f7..8287f6b 100644 --- a/backend/video-recombiner/cmd/helpers_test.go +++ b/backend/video-recombiner/cmd/helpers_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "testing" + "video-recombiner/internal/test" "github.com/stretchr/testify/require" ) @@ -33,3 +34,12 @@ func writeEnvFile(t *testing.T, content string) { require.NoError(t, os.WriteFile(path, []byte(content), 0600)) t.Cleanup(func() { _ = os.Remove(path) }) } + +// okJS returns a mock JetStream that succeeds through the full consumer setup. +func okJS() *test.MockJS { + return &test.MockJS{JStream: &test.MockStream{Cons: &test.MockConsumer{}}} +} + +func okKV() *test.MockKV { + return &test.MockKV{} +} diff --git a/backend/video-recombiner/cmd/main_integration_test.go b/backend/video-recombiner/cmd/main_integration_test.go index 2bcd50c..36d1431 100644 --- a/backend/video-recombiner/cmd/main_integration_test.go +++ b/backend/video-recombiner/cmd/main_integration_test.go @@ -36,11 +36,12 @@ func TestMain(m *testing.M) { func TestRunCombinerI(t *testing.T) { t.Run("quit signal exits cleanly", func(t *testing.T) { js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) quit := make(chan os.Signal, 1) done := make(chan error, 1) go func() { - done <- runCombiner(js, nc, test.SilentLogger(), sharedFilerURL, quit) + done <- runCombiner(js, nc, kv, test.SilentLogger(), sharedFilerURL, quit) }() time.Sleep(200 * time.Millisecond) @@ -72,16 +73,20 @@ func TestRunCombinerI(t *testing.T) { require.NoError(t, err) quit := make(chan os.Signal, 1) - err = runCombiner(js, nc, test.SilentLogger(), sharedFilerURL, quit) + err = runCombiner(js, nc, nil, test.SilentLogger(), sharedFilerURL, quit) assert.Error(t, err) }) t.Run("full flow: receive chunks, combine, publish downstream", func(t *testing.T) { - if _, err := exec.LookPath("ffmpeg"); err != nil { + _, err := exec.LookPath("ffmpeg") + if err != nil { t.Skip("ffmpeg not available") } + js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) + jobID := "job-full-flow" t.Cleanup(func() { os.RemoveAll("/tmp/processed_chunk-" + jobID) @@ -94,8 +99,6 @@ func TestRunCombinerI(t *testing.T) { test.SeedProcessedVideo(t, sharedFilerURL, jobID, "chunk-0.mp4", videoData) test.SeedProcessedVideo(t, sharedFilerURL, jobID, "chunk-1.mp4", videoData) - js, nc := test.SetupNats(t) - received := make(chan []byte, 1) sub, err := nc.Subscribe("jobs.complete", func(msg *nats.Msg) { received <- msg.Data @@ -107,7 +110,7 @@ func TestRunCombinerI(t *testing.T) { done := make(chan error, 1) go func() { - done <- runCombiner(js, nc, test.SilentLogger(), sharedFilerURL, quit) + done <- runCombiner(js, nc, kv, test.SilentLogger(), sharedFilerURL, quit) }() time.Sleep(500 * time.Millisecond) @@ -146,6 +149,19 @@ func TestRunCombinerI(t *testing.T) { }) } +func TestKVSetup(t *testing.T) { + t.Run("CreateOrUpdateKeyValue fails when JetStream is not enabled", func(t *testing.T) { + nc := test.SetupNatsNoJetStream(t) + + js, err := jetstream.New(nc) + require.NoError(t, err) + + _, err = js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{Bucket: "recombine-chunk-recieved"}) + + assert.Error(t, err) + }) +} + func TestMainI(t *testing.T) { t.Run("storage unreachable exits with code 1", func(t *testing.T) { code := patchExit(t) diff --git a/backend/video-recombiner/cmd/main_unit_test.go b/backend/video-recombiner/cmd/main_unit_test.go index 20c3726..cb76d98 100644 --- a/backend/video-recombiner/cmd/main_unit_test.go +++ b/backend/video-recombiner/cmd/main_unit_test.go @@ -3,8 +3,6 @@ package main import ( - "context" - "log/slog" "os" "path/filepath" "testing" @@ -15,33 +13,13 @@ import ( "github.com/stretchr/testify/require" ) -// okJS returns a mock JetStream that succeeds through the full consumer setup. -func okJS() *test.MockJS { - return &test.MockJS{JStream: &test.MockStream{Cons: &test.MockConsumer{}}} -} - -func TestStructuredLogger(t *testing.T) { - t.Run("dev mode should enable debug level", func(t *testing.T) { - logger := newLogger(&Config{ProdMode: false}) - - assert.True(t, logger.Enabled(context.Background(), slog.LevelDebug)) - }) - - t.Run("prod mode should disable debug level", func(t *testing.T) { - logger := newLogger(&Config{ProdMode: true}) - - assert.False(t, logger.Enabled(context.Background(), slog.LevelDebug)) - assert.True(t, logger.Enabled(context.Background(), slog.LevelInfo)) - }) -} - func TestRunCombiner(t *testing.T) { t.Run("consume video chunk error should return error", func(t *testing.T) { js := &test.MockJS{JStreamNameErr: assert.AnError} nc := &test.MockDrainer{} quit := make(chan os.Signal, 1) - err := runCombiner(js, nc, test.SilentLogger(), "http://storage", quit) + err := runCombiner(js, nc, okKV(), test.SilentLogger(), "http://storage", quit) require.ErrorIs(t, err, assert.AnError) assert.False(t, nc.DrainCalled, "Drain should not be called if consumer setup fails") @@ -52,7 +30,7 @@ func TestRunCombiner(t *testing.T) { done := make(chan error, 1) go func() { - done <- runCombiner(okJS(), &test.MockDrainer{}, test.SilentLogger(), "http://storage", quit) + done <- runCombiner(okJS(), &test.MockDrainer{}, okKV(), test.SilentLogger(), "http://storage", quit) }() select { @@ -77,7 +55,7 @@ func TestRunCombiner(t *testing.T) { quit := make(chan os.Signal, 1) quit <- os.Interrupt - require.NoError(t, runCombiner(js, &test.MockDrainer{}, test.SilentLogger(), "http://storage", quit)) + require.NoError(t, runCombiner(js, &test.MockDrainer{}, okKV(), test.SilentLogger(), "http://storage", quit)) require.NotNil(t, consumer.Ctx) assert.True(t, consumer.Ctx.Stopped) @@ -88,7 +66,7 @@ func TestRunCombiner(t *testing.T) { quit := make(chan os.Signal, 1) quit <- os.Interrupt - require.NoError(t, runCombiner(okJS(), nc, test.SilentLogger(), "http://storage", quit)) + require.NoError(t, runCombiner(okJS(), nc, okKV(), test.SilentLogger(), "http://storage", quit)) assert.True(t, nc.DrainCalled) }) @@ -98,7 +76,7 @@ func TestRunCombiner(t *testing.T) { quit := make(chan os.Signal, 1) quit <- os.Interrupt - err := runCombiner(okJS(), nc, test.SilentLogger(), "http://storage", quit) + err := runCombiner(okJS(), nc, okKV(), test.SilentLogger(), "http://storage", quit) assert.ErrorIs(t, err, assert.AnError) }) diff --git a/backend/video-recombiner/internal/handler/subscriber_integration_test.go b/backend/video-recombiner/internal/handler/subscriber_integration_test.go index 8602a3a..4c70228 100644 --- a/backend/video-recombiner/internal/handler/subscriber_integration_test.go +++ b/backend/video-recombiner/internal/handler/subscriber_integration_test.go @@ -50,15 +50,16 @@ func TestRecombineVideo(t *testing.T) { js, err := jetstream.New(nc) require.NoError(t, err) - _, err = handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + _, err = handler.RecombineVideo(js, nil, test.SilentLogger(), t.TempDir()) assert.Error(t, err) }) t.Run("returns consume context", func(t *testing.T) { js, _ := test.SetupNats(t) + kv := test.SetupKV(t, js) - consCtx, err := handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + consCtx, err := handler.RecombineVideo(js, kv, test.SilentLogger(), t.TempDir()) require.NoError(t, err) assert.NotNil(t, consCtx) @@ -67,8 +68,9 @@ func TestRecombineVideo(t *testing.T) { t.Run("creates consumer with correct config", func(t *testing.T) { ctx := context.Background() js, _ := test.SetupNats(t) + kv := test.SetupKV(t, js) - _, err := handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + _, err := handler.RecombineVideo(js, kv, test.SilentLogger(), t.TempDir()) require.NoError(t, err) stream, err := js.Stream(ctx, "jobs") @@ -93,8 +95,9 @@ func TestRecombineVideo(t *testing.T) { func TestMessageHandlingI(t *testing.T) { t.Run("invalid JSON does not publish downstream", func(t *testing.T) { js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) - _, err := handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + _, err := handler.RecombineVideo(js, kv, test.SilentLogger(), t.TempDir()) require.NoError(t, err) received := make(chan struct{}, 1) @@ -116,8 +119,9 @@ func TestMessageHandlingI(t *testing.T) { t.Run("partial chunk does not publish downstream", func(t *testing.T) { js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) - _, err := handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + _, err := handler.RecombineVideo(js, kv, test.SilentLogger(), t.TempDir()) require.NoError(t, err) received := make(chan struct{}, 1) @@ -127,9 +131,8 @@ func TestMessageHandlingI(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { _ = sub.Unsubscribe() }) - // Only the first of two chunks arrives — tracker not ready, no downstream publish. payload, err := json.Marshal(service.ChunkCompleteMessage{ - JobID: "job-1", + JobID: "job-partial", ChunkIndex: 0, TotalChunks: 2, StorageURL: "http://storage/chunk-0.mp4", @@ -148,15 +151,16 @@ func TestMessageHandlingI(t *testing.T) { t.Run("all chunks received triggers combine", func(t *testing.T) { js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) videoFile := test.OpenTestVideo(t) videoData, err := os.ReadFile(videoFile.Name()) require.NoError(t, err) - test.SeedProcessedVideo(t, sharedFilerURL, "job-1", "chunk-0.mp4", videoData) - test.SeedProcessedVideo(t, sharedFilerURL, "job-1", "chunk-1.mp4", videoData) + test.SeedProcessedVideo(t, sharedFilerURL, "job-combine", "chunk-0.mp4", videoData) + test.SeedProcessedVideo(t, sharedFilerURL, "job-combine", "chunk-1.mp4", videoData) - _, err = handler.RecombineVideo(js, test.SilentLogger(), sharedFilerURL) + _, err = handler.RecombineVideo(js, kv, test.SilentLogger(), sharedFilerURL) require.NoError(t, err) received := make(chan struct{}, 1) @@ -168,9 +172,9 @@ func TestMessageHandlingI(t *testing.T) { ctx := context.Background() for i, fileName := range []string{"chunk-0.mp4", "chunk-1.mp4"} { - storageURL := fmt.Sprintf("%s/job-1/%s/processed", sharedFilerURL, fileName) + storageURL := fmt.Sprintf("%s/job-combine/%s/processed", sharedFilerURL, fileName) payload, err := json.Marshal(service.ChunkCompleteMessage{ - JobID: "job-1", + JobID: "job-combine", ChunkIndex: i, TotalChunks: 2, StorageURL: storageURL, @@ -187,3 +191,68 @@ func TestMessageHandlingI(t *testing.T) { } }) } + +func TestRecombineVideoIdempotency(t *testing.T) { + t.Run("already received chunk is acked and skipped", func(t *testing.T) { + js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) + + jobID := "job-idempotency-skip" + + // Pre-seed the KV as if this chunk was already received. + _, err := kv.Put(context.Background(), fmt.Sprintf("%s.%d", jobID, 0), []byte("received")) + require.NoError(t, err) + + _, err = handler.RecombineVideo(js, kv, test.SilentLogger(), sharedFilerURL) + require.NoError(t, err) + + secondComplete := make(chan struct{}, 1) + sub, err := nc.Subscribe("jobs.complete", func(_ *nats.Msg) { + secondComplete <- struct{}{} + }) + require.NoError(t, err) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + payload, err := json.Marshal(service.ChunkCompleteMessage{ + JobID: jobID, + ChunkIndex: 0, + TotalChunks: 1, + StorageURL: "http://storage/fake", + }) + require.NoError(t, err) + _, err = js.Publish(context.Background(), "jobs.chunks.complete", payload) + require.NoError(t, err) + + select { + case <-secondComplete: + t.Fatal("already received chunk triggered a downstream publish") + case <-time.After(2 * time.Second): + } + }) + + t.Run("kv entry is written after chunk is acked", func(t *testing.T) { + js, _ := test.SetupNats(t) + kv := test.SetupKV(t, js) + + jobID := "job-idempotency-write" + + _, err := handler.RecombineVideo(js, kv, test.SilentLogger(), sharedFilerURL) + require.NoError(t, err) + + // Partial chunk (TotalChunks:2) so combine never fires — KV write still happens after ack. + payload, err := json.Marshal(service.ChunkCompleteMessage{ + JobID: jobID, + ChunkIndex: 0, + TotalChunks: 2, + StorageURL: "http://storage/chunk-0.mp4", + }) + require.NoError(t, err) + _, err = js.Publish(context.Background(), "jobs.chunks.complete", payload) + require.NoError(t, err) + + require.Eventually(t, func() bool { + _, err := kv.Get(context.Background(), fmt.Sprintf("%s.%d", jobID, 0)) + return err == nil + }, 10*time.Second, 200*time.Millisecond, "kv entry for received chunk was never written") + }) +} diff --git a/backend/video-recombiner/internal/handler/subscriber_unit_test.go b/backend/video-recombiner/internal/handler/subscriber_unit_test.go index e817674..4278d26 100644 --- a/backend/video-recombiner/internal/handler/subscriber_unit_test.go +++ b/backend/video-recombiner/internal/handler/subscriber_unit_test.go @@ -5,6 +5,7 @@ package handler_test import ( "encoding/json" "errors" + "fmt" "testing" "video-recombiner/internal/handler" "video-recombiner/internal/service" @@ -27,6 +28,18 @@ func (m *mockMsg) Data() []byte { return m.data } func (m *mockMsg) Nak() error { m.nakCalled = true; return nil } func (m *mockMsg) Ack() error { m.ackCalled = true; return m.ackErr } +func validPayload(t *testing.T, jobID string) []byte { + t.Helper() + data, err := json.Marshal(service.ChunkCompleteMessage{ + JobID: jobID, + ChunkIndex: 0, + TotalChunks: 2, // not ready — combine never runs + StorageURL: "http://localhost:1/job-1/chunk.mp4", + }) + require.NoError(t, err) + return data +} + func TestReturnError(t *testing.T) { streamNameErr := errors.New("no stream") streamErr := errors.New("stream error") @@ -62,7 +75,7 @@ func TestReturnError(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - _, err := handler.RecombineVideo(tc.js, test.SilentLogger(), "http://storage") + _, err := handler.RecombineVideo(tc.js, &test.MockKV{}, test.SilentLogger(), "http://storage") require.Error(t, err) assert.ErrorIs(t, err, tc.wantErr) @@ -76,7 +89,7 @@ func TestMessageHandling(t *testing.T) { consumer := &test.MockConsumerWithMsg{Msg: msg} js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} - consCtx, err := handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + consCtx, err := handler.RecombineVideo(js, &test.MockKV{}, test.SilentLogger(), t.TempDir()) require.NoError(t, err) assert.NotNil(t, consCtx) @@ -98,7 +111,7 @@ func TestMessageHandling(t *testing.T) { consumer := &test.MockConsumerWithMsg{Msg: msg} js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} - consCtx, err := handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + consCtx, err := handler.RecombineVideo(js, &test.MockKV{}, test.SilentLogger(), t.TempDir()) require.NoError(t, err) assert.NotNil(t, consCtx) @@ -121,7 +134,7 @@ func TestMessageHandling(t *testing.T) { consumer := &test.MockConsumerWithMsg{Msg: msg} js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} - consCtx, err := handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + consCtx, err := handler.RecombineVideo(js, &test.MockKV{}, test.SilentLogger(), t.TempDir()) require.NoError(t, err) assert.NotNil(t, consCtx) @@ -129,7 +142,7 @@ func TestMessageHandling(t *testing.T) { assert.False(t, msg.nakCalled) }) - t.Run("ack failure does not trigger combine", func(t *testing.T) { + t.Run("ack failure does not trigger combine or write kv", func(t *testing.T) { // When Ack returns an error the handler returns early before downloading chunks. payload, err := json.Marshal(service.ChunkCompleteMessage{ JobID: "job-1", @@ -142,12 +155,81 @@ func TestMessageHandling(t *testing.T) { msg := &mockMsg{data: payload, ackErr: errors.New("ack failed")} consumer := &test.MockConsumerWithMsg{Msg: msg} js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{} - consCtx, err := handler.RecombineVideo(js, test.SilentLogger(), t.TempDir()) + consCtx, err := handler.RecombineVideo(js, kv, test.SilentLogger(), t.TempDir()) require.NoError(t, err) assert.NotNil(t, consCtx) assert.True(t, msg.ackCalled) assert.False(t, msg.nakCalled) + assert.Empty(t, kv.PutKey) + }) +} + +func TestIdempotency(t *testing.T) { + t.Run("already processed chunk acks and skips processing", func(t *testing.T) { + msg := &mockMsg{data: validPayload(t, "job-1")} + consumer := &test.MockConsumerWithMsg{Msg: msg} + js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{GetFound: true} + + _, err := handler.RecombineVideo(js, kv, test.SilentLogger(), "http://storage") + + require.NoError(t, err) + assert.True(t, msg.ackCalled) + assert.False(t, msg.nakCalled) + }) + + t.Run("already processed chunk does not write to kv again", func(t *testing.T) { + msg := &mockMsg{data: validPayload(t, "job-1")} + consumer := &test.MockConsumerWithMsg{Msg: msg} + js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{GetFound: true} + + _, err := handler.RecombineVideo(js, kv, test.SilentLogger(), "http://storage") + + require.NoError(t, err) + assert.Empty(t, kv.PutKey) + }) + + t.Run("kv check error does not ack or nak", func(t *testing.T) { + msg := &mockMsg{data: validPayload(t, "job-1")} + consumer := &test.MockConsumerWithMsg{Msg: msg} + js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{GetErr: errors.New("kv unavailable")} + + _, err := handler.RecombineVideo(js, kv, test.SilentLogger(), "http://storage") + + require.NoError(t, err) + assert.False(t, msg.ackCalled) + assert.False(t, msg.nakCalled) + }) + + t.Run("writes kv with correct key after ack", func(t *testing.T) { + payload, err := json.Marshal(service.ChunkCompleteMessage{ + JobID: "job-abc", + ChunkIndex: 2, + TotalChunks: 3, // not ready — combine never runs, but KV write still happens + StorageURL: "http://localhost:1/job-abc/chunk.mp4", + }) + require.NoError(t, err) + + msg := &mockMsg{data: payload} + consumer := &test.MockConsumerWithMsg{Msg: msg} + js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{} + + _, err = handler.RecombineVideo(js, kv, test.SilentLogger(), "http://storage") + + require.NoError(t, err) + assert.Equal(t, "job-abc.2", kv.PutKey) + }) + + t.Run("kv key format is job_id.chunk_index", func(t *testing.T) { + jobID := "abc-123" + chunkIndex := 3 + expected := fmt.Sprintf("%s.%d", jobID, chunkIndex) + assert.Equal(t, "abc-123.3", expected) }) } diff --git a/backend/video-recombiner/internal/observability/logging_unit_test.go b/backend/video-recombiner/internal/observability/logging_unit_test.go new file mode 100644 index 0000000..938632b --- /dev/null +++ b/backend/video-recombiner/internal/observability/logging_unit_test.go @@ -0,0 +1,28 @@ +//go:build unit + +package observability_test + +import ( + "context" + "log/slog" + "testing" + "video-recombiner/internal/observability" + + "github.com/stretchr/testify/assert" +) + +func TestStructuredLogger(t *testing.T) { + + t.Run("prod mode set to false should enable debug level", func(t *testing.T) { + logger := observability.StructuredLogger(false) + + assert.True(t, logger.Enabled(context.Background(), slog.LevelDebug)) + }) + + t.Run("prod mode set to true should disable debug level", func(t *testing.T) { + logger := observability.StructuredLogger(true) + + assert.False(t, logger.Enabled(context.Background(), slog.LevelDebug)) + assert.True(t, logger.Enabled(context.Background(), slog.LevelInfo)) + }) +} diff --git a/backend/video-recombiner/internal/service/chunk_kv_unit_test.go b/backend/video-recombiner/internal/service/chunk_kv_unit_test.go new file mode 100644 index 0000000..298ad4a --- /dev/null +++ b/backend/video-recombiner/internal/service/chunk_kv_unit_test.go @@ -0,0 +1,91 @@ +//go:build unit + +package service_test + +import ( + "errors" + "testing" + "video-recombiner/internal/service" + "video-recombiner/internal/test" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCheckChunkRecieved(t *testing.T) { + t.Run("returns false when key not found", func(t *testing.T) { + kv := &test.MockKV{GetFound: false} + + processed, err := service.CheckChunkRecieved(kv, "job-1", 0) + + require.NoError(t, err) + assert.False(t, processed) + }) + + t.Run("returns true when key exists", func(t *testing.T) { + kv := &test.MockKV{GetFound: true} + + processed, err := service.CheckChunkRecieved(kv, "job-1", 0) + + require.NoError(t, err) + assert.True(t, processed) + }) + + t.Run("returns error on unexpected kv failure", func(t *testing.T) { + kv := &test.MockKV{GetErr: errors.New("kv unavailable")} + + _, err := service.CheckChunkRecieved(kv, "job-1", 0) + + require.Error(t, err) + assert.ErrorContains(t, err, "failed") + }) + + t.Run("does not return error for ErrKeyNotFound", func(t *testing.T) { + kv := &test.MockKV{GetErr: jetstream.ErrKeyNotFound} + + processed, err := service.CheckChunkRecieved(kv, "job-1", 0) + + require.NoError(t, err) + assert.False(t, processed) + }) + + t.Run("uses correct key format job_id.chunk_index", func(t *testing.T) { + // Key lookup for job "abc" chunk 3 must use "abc.3". + // We verify by having GetFound=true and confirming no error path is hit. + kv := &test.MockKV{GetFound: true} + + processed, err := service.CheckChunkRecieved(kv, "abc", 3) + + require.NoError(t, err) + assert.True(t, processed) + }) +} + +func TestAddChunkRecieved(t *testing.T) { + t.Run("returns nil on success", func(t *testing.T) { + kv := &test.MockKV{} + + err := service.AddChunkRecieved(kv, "job-1", 0) + + require.NoError(t, err) + }) + + t.Run("writes correct key job_id.chunk_index", func(t *testing.T) { + kv := &test.MockKV{} + + err := service.AddChunkRecieved(kv, "job-abc", 2) + + require.NoError(t, err) + assert.Equal(t, "job-abc.2", kv.PutKey) + }) + + t.Run("returns error on kv failure", func(t *testing.T) { + kv := &test.MockKV{PutErr: errors.New("put failed")} + + err := service.AddChunkRecieved(kv, "job-1", 0) + + require.Error(t, err) + assert.ErrorContains(t, err, "failed") + }) +} diff --git a/backend/video-recombiner/internal/service/cleanup_unit_test.go b/backend/video-recombiner/internal/service/cleanup_unit_test.go new file mode 100644 index 0000000..0db6cff --- /dev/null +++ b/backend/video-recombiner/internal/service/cleanup_unit_test.go @@ -0,0 +1,59 @@ +//go:build unit + +package service + +import ( + "errors" + "io" + "log/slog" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func silentLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func TestCleanUpTempFolders(t *testing.T) { + t.Cleanup(func() { removeAll = os.RemoveAll }) + + t.Run("calls removeAll for both temp dirs", func(t *testing.T) { + var removed []string + removeAll = func(path string) error { + removed = append(removed, path) + return nil + } + + CleanUpTempFolders("job-1", silentLogger()) + + assert.Contains(t, removed, "/tmp/processed_chunk-job-1") + assert.Contains(t, removed, "/tmp/jobs/job-1") + }) + + tests := []struct { + name string + failOnCall int + }{ + {"logs warn and continues when chunk dir removal fails", 1}, + {"logs warn and continues when job dir removal fails", 2}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + calls := 0 + removeAll = func(_ string) error { + calls++ + if calls == tc.failOnCall { + return errors.New("remove failed") + } + return nil + } + + CleanUpTempFolders("job-1", silentLogger()) + + assert.Equal(t, 2, calls, "both dirs should be attempted even if one fails") + }) + } +} diff --git a/backend/video-recombiner/internal/test/jetstream_mocks.go b/backend/video-recombiner/internal/test/jetstream_mocks.go index f3b5cf0..8ec56df 100644 --- a/backend/video-recombiner/internal/test/jetstream_mocks.go +++ b/backend/video-recombiner/internal/test/jetstream_mocks.go @@ -6,6 +6,37 @@ import ( "github.com/nats-io/nats.go/jetstream" ) +// MockKV stubs jetstream.KeyValue for unit tests. +type MockKV struct { + jetstream.KeyValue + GetErr error + GetFound bool // if true, Get returns a non-nil entry; if false, returns ErrKeyNotFound + PutErr error + PutKey string +} + +func (m *MockKV) Get(_ context.Context, key string) (jetstream.KeyValueEntry, error) { + if m.GetErr != nil { + return nil, m.GetErr + } + if !m.GetFound { + return nil, jetstream.ErrKeyNotFound + } + return &mockKVEntry{key: key}, nil +} + +func (m *MockKV) Put(_ context.Context, key string, _ []byte) (uint64, error) { + m.PutKey = key + return 0, m.PutErr +} + +type mockKVEntry struct { + jetstream.KeyValueEntry + key string +} + +func (e *mockKVEntry) Key() string { return e.key } + // MockJS stubs jetstream.JetStream. Set StreamNameErr to simulate a lookup failure. type MockJS struct { jetstream.JetStream diff --git a/backend/video-recombiner/internal/test/nats_fixtures.go b/backend/video-recombiner/internal/test/nats_fixtures.go index d597774..74370d3 100644 --- a/backend/video-recombiner/internal/test/nats_fixtures.go +++ b/backend/video-recombiner/internal/test/nats_fixtures.go @@ -9,7 +9,9 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/stretchr/testify/require" + tc "github.com/testcontainers/testcontainers-go" natstc "github.com/testcontainers/testcontainers-go/modules/nats" + "github.com/testcontainers/testcontainers-go/wait" ) // fixture for setting up nats container for testing @@ -39,3 +41,40 @@ func SetupNats(t *testing.T) (jetstream.JetStream, *nats.Conn) { return js, nc } + +// starts a plain NATS container without JetStream enabled and returns the connection. +func SetupNatsNoJetStream(t *testing.T) *nats.Conn { + t.Helper() + ctx := context.Background() + + container, err := tc.GenericContainer(ctx, tc.GenericContainerRequest{ + ContainerRequest: tc.ContainerRequest{ + Image: "nats:2.10-alpine", + ExposedPorts: []string{"4222/tcp"}, + WaitingFor: wait.ForLog("Server is ready"), + }, + Started: true, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = container.Terminate(ctx) }) + + host, err := container.Host(ctx) + require.NoError(t, err) + port, err := container.MappedPort(ctx, "4222") + require.NoError(t, err) + + nc, err := nats.Connect("nats://" + host + ":" + port.Port()) + require.NoError(t, err) + t.Cleanup(nc.Close) + + return nc +} + +func SetupKV(t *testing.T, js jetstream.JetStream) jetstream.KeyValue { + t.Helper() + kv, err := js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{ + Bucket: "recombine-chunk-recieved", + }) + require.NoError(t, err) + return kv +} From c54412618edcef67436c71cd4297e3272e9b561c Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 19:34:44 -0700 Subject: [PATCH 4/5] fix(video-upload): added missing ttl to KV --- backend/video-upload/cmd/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/video-upload/cmd/main.go b/backend/video-upload/cmd/main.go index d7091a3..64b9bab 100644 --- a/backend/video-upload/cmd/main.go +++ b/backend/video-upload/cmd/main.go @@ -56,6 +56,7 @@ func main() { kv, err := js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{ Bucket: "job-status", Description: "tracks job state across the pipeline", + TTL: 3 * time.Hour, }) if err != nil { logger.Error("failed to ccreate job-status kv bucket", "err", err) From 4071090ba6e8c98c85fc17b2c352e78d0e61cb24 Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 19:41:33 -0700 Subject: [PATCH 5/5] tests(transcoder-worker): updated subscriber unit test --- .../internal/handler/subscriber_unit_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/backend/transcoder-worker/internal/handler/subscriber_unit_test.go b/backend/transcoder-worker/internal/handler/subscriber_unit_test.go index 05461fc..9c41266 100644 --- a/backend/transcoder-worker/internal/handler/subscriber_unit_test.go +++ b/backend/transcoder-worker/internal/handler/subscriber_unit_test.go @@ -113,7 +113,7 @@ func TestAckAndNacking(t *testing.T) { assert.True(t, msg.nakCalled) }) - t.Run("fetch failure does not nak or ack", func(t *testing.T) { + t.Run("fetch failure naks", func(t *testing.T) { msg := &mockMsg{data: validPayload(t, "job-1")} consumer := &test.MockConsumerWithMsg{Msg: msg} js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} @@ -121,8 +121,7 @@ func TestAckAndNacking(t *testing.T) { _, err := handler.ConsumeVideoChunk("http://storage", js, &test.MockKV{}, test.SilentLogger()) require.NoError(t, err) - assert.False(t, msg.nakCalled) - assert.False(t, msg.ackCalled) + assert.True(t, msg.nakCalled) }) }