From 7f687ef7c7505c405c7b71fd1a9215f2184b3ac6 Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 11:29:19 -0700 Subject: [PATCH 1/8] chore(scene-detector): reduced KV TTL from 7 days to 3 hours --- backend/scene-detector/src/core/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/scene-detector/src/core/settings.py b/backend/scene-detector/src/core/settings.py index e857def..8e196aa 100644 --- a/backend/scene-detector/src/core/settings.py +++ b/backend/scene-detector/src/core/settings.py @@ -23,7 +23,7 @@ class Settings(BaseSettings): MAX_DELIVER_ATTEMPTS: int = 3 ACK_WAIT_S: int = 30 - KV_BUCKET_TTL_S: int = 7 * 24 * 60 * 60 # 7 days TTL + KV_BUCKET_TTL_S: int = 3 * 60 * 60 # 3 hour TTL BASE_STORAGE_URL: str = "http://localhost:8888" From 209622e524860e4c85c029ee468e3389de04a65f Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 11:57:10 -0700 Subject: [PATCH 2/8] feat(transcoder-worker): created idempotency kv helper functions --- .../internal/service/chunk_kv.go | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 backend/transcoder-worker/internal/service/chunk_kv.go diff --git a/backend/transcoder-worker/internal/service/chunk_kv.go b/backend/transcoder-worker/internal/service/chunk_kv.go new file mode 100644 index 0000000..bdad497 --- /dev/null +++ b/backend/transcoder-worker/internal/service/chunk_kv.go @@ -0,0 +1,39 @@ +package service + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +// check if a jobID chunk already is processed, returns a bool based on if it exists in the KV +func CheckChunkProcessed(kv jetstream.KeyValue, jobID string, chunkIndex int) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err := kv.Get(ctx, fmt.Sprintf("%s.%d", jobID, chunkIndex)) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return false, nil + } + return false, fmt.Errorf("failed to check chunk processed: %w", err) + } + + return true, nil +} + +// add a completed job chunk to the KV for idempotency +func AddChunkProcessed(kv jetstream.KeyValue, jobID string, chunkIndex int) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err := kv.Put(ctx, fmt.Sprintf("%s.%d", jobID, chunkIndex), []byte("processed")) + if err != nil { + return fmt.Errorf("failed to mark job chunk as processed", "err", err) + } + + return nil +} From 35c869fc39ea68ecd754f738156371133f110ecc Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 11:57:36 -0700 Subject: [PATCH 3/8] feat(transcoder-worker): updated subscriber.go with the idempotency KV check and add --- .../internal/handler/subscriber.go | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/backend/transcoder-worker/internal/handler/subscriber.go b/backend/transcoder-worker/internal/handler/subscriber.go index 61b58ad..dc5b46e 100644 --- a/backend/transcoder-worker/internal/handler/subscriber.go +++ b/backend/transcoder-worker/internal/handler/subscriber.go @@ -19,7 +19,9 @@ const subSubject = "jobs.video.chunks" var removeAll = os.RemoveAll // consume video chunk from nats jetstream and process it -func ConsumeVideoChunk(baseStorageURL string, js jetstream.JetStream, logger *slog.Logger) (jetstream.ConsumeContext, error) { +func ConsumeVideoChunk( + baseStorageURL string, js jetstream.JetStream, kv jetstream.KeyValue, logger *slog.Logger, +) (jetstream.ConsumeContext, error) { ctx := context.Background() streamName, err := js.StreamNameBySubject(ctx, subSubject) @@ -60,6 +62,22 @@ func ConsumeVideoChunk(baseStorageURL string, js jetstream.JetStream, logger *sl return } + exists, err := service.CheckChunkProcessed(kv, payload.JobID, payload.ChunkIndex) + if err != nil { + logger.Error("failed to check chunk processed", "err", err) + return + } + + if exists { + logger.Debug("message already processed, skipping") + err := msg.Ack() + if err != nil { + logger.Error("error acking msg", "err", err) + return + } + return + } + filePath, err := storage.GetUnprocessedVideoChunk(payload.StorageURL, payload.JobID) if err != nil { logger.Error("error fetching unprocessed video chunk", "job_id", payload.JobID, "err", err) @@ -115,6 +133,12 @@ func ConsumeVideoChunk(baseStorageURL string, js jetstream.JetStream, logger *sl return } + err = service.AddChunkProcessed(kv, payload.JobID, payload.ChunkIndex) + if err != nil { + logger.Error("err", err) + return + } + err = removeAll("/tmp/temp-unprocessed-" + payload.JobID) if err != nil { logger.Warn("error removing the temp unprocessed folder", "err", err) From e26f6253b87c703056f50d5631cdb132e26ec7c9 Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 11:58:14 -0700 Subject: [PATCH 4/8] feat(transcoder-worker): updated main.go to create the idempotency kv --- backend/transcoder-worker/cmd/main.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/backend/transcoder-worker/cmd/main.go b/backend/transcoder-worker/cmd/main.go index 1a1152d..a817908 100644 --- a/backend/transcoder-worker/cmd/main.go +++ b/backend/transcoder-worker/cmd/main.go @@ -1,12 +1,14 @@ package main import ( + "context" "fmt" "log" "log/slog" "os" "os/signal" "syscall" + "time" "transcoder-worker/internal/handler" "transcoder-worker/internal/storage" @@ -55,10 +57,20 @@ func main() { return } + kv, err := js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{ + Bucket: "transcode-chunk-job-processed", + Description: "tracks already completed video chunk for the jobID is already processed for idempotency", + TTL: 3 * time.Hour, + }) + if err != nil { + logger.Error("failed to ccreate transcode-chunk-job-processed kv bucket", "err", err) + os.Exit(1) + } + quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) - err = runProcessing(cfg.BaseStorageURL, js, nc, logger, quit) + err = runProcessing(cfg.BaseStorageURL, js, nc, kv, logger, quit) if err != nil { logger.Error("error flushing remaining msgs", "err", err) } @@ -69,10 +81,17 @@ type ncDrainer interface { } // run the subscriber and publisher and blocks so main doesnt exit after consumevideochunk retunrs -func runProcessing(baseStorageURL string, js jetstream.JetStream, nc ncDrainer, logger *slog.Logger, quit <-chan os.Signal) error { +func runProcessing( + baseStorageURL string, + js jetstream.JetStream, + nc ncDrainer, + kv jetstream.KeyValue, + logger *slog.Logger, + quit <-chan os.Signal, +) error { logger.Debug("starting service") - consCtx, err := handler.ConsumeVideoChunk(baseStorageURL, js, logger) + consCtx, err := handler.ConsumeVideoChunk(baseStorageURL, js, kv, logger) if err != nil { return fmt.Errorf("failed to start consumer: %w", err) } From b7e3599af51af201e2229f8573e8a5f2dc457f2e Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 11:59:01 -0700 Subject: [PATCH 5/8] fix(transcoder-worker): fixed fmt error formatting in chunk_kv.go --- backend/transcoder-worker/internal/service/chunk_kv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/transcoder-worker/internal/service/chunk_kv.go b/backend/transcoder-worker/internal/service/chunk_kv.go index bdad497..657362a 100644 --- a/backend/transcoder-worker/internal/service/chunk_kv.go +++ b/backend/transcoder-worker/internal/service/chunk_kv.go @@ -32,7 +32,7 @@ func AddChunkProcessed(kv jetstream.KeyValue, jobID string, chunkIndex int) erro _, err := kv.Put(ctx, fmt.Sprintf("%s.%d", jobID, chunkIndex), []byte("processed")) if err != nil { - return fmt.Errorf("failed to mark job chunk as processed", "err", err) + return fmt.Errorf("failed to mark job chunk as processed: %w", err) } return nil From f244b2075d13445511f5b41a76a30b0f0dd1d615 Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 12:11:21 -0700 Subject: [PATCH 6/8] chore(transcoder-worker): updated error msgs --- backend/transcoder-worker/internal/handler/subscriber.go | 2 +- backend/transcoder-worker/internal/service/chunk_kv.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/transcoder-worker/internal/handler/subscriber.go b/backend/transcoder-worker/internal/handler/subscriber.go index dc5b46e..1055bb6 100644 --- a/backend/transcoder-worker/internal/handler/subscriber.go +++ b/backend/transcoder-worker/internal/handler/subscriber.go @@ -135,7 +135,7 @@ func ConsumeVideoChunk( err = service.AddChunkProcessed(kv, payload.JobID, payload.ChunkIndex) if err != nil { - logger.Error("err", err) + logger.Error("failed to mark job chunk as processed", "err", err) return } diff --git a/backend/transcoder-worker/internal/service/chunk_kv.go b/backend/transcoder-worker/internal/service/chunk_kv.go index 657362a..dc5e4ed 100644 --- a/backend/transcoder-worker/internal/service/chunk_kv.go +++ b/backend/transcoder-worker/internal/service/chunk_kv.go @@ -19,7 +19,7 @@ func CheckChunkProcessed(kv jetstream.KeyValue, jobID string, chunkIndex int) (b if errors.Is(err, jetstream.ErrKeyNotFound) { return false, nil } - return false, fmt.Errorf("failed to check chunk processed: %w", err) + return false, fmt.Errorf("failed: %w", err) } return true, nil @@ -32,7 +32,7 @@ func AddChunkProcessed(kv jetstream.KeyValue, jobID string, chunkIndex int) erro _, err := kv.Put(ctx, fmt.Sprintf("%s.%d", jobID, chunkIndex), []byte("processed")) if err != nil { - return fmt.Errorf("failed to mark job chunk as processed: %w", err) + return fmt.Errorf("failed: %w", err) } return nil From 9787aa1577fb06bc15e591574bd4a3af913d9a8b Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 12:15:58 -0700 Subject: [PATCH 7/8] tests(transcoder-worker): created unit/integration tests for chunk_kv.go and updated subscriber and main.go --- backend/transcoder-worker/cmd/main.go | 2 +- .../cmd/main_integration_test.go | 21 +++- .../transcoder-worker/cmd/main_unit_test.go | 14 ++- .../handler/subscriber_integration_test.go | 113 ++++++++++++++++-- .../internal/handler/subscriber_unit_test.go | 93 ++++++++++++-- .../internal/service/chunk_kv_unit_test.go | 91 ++++++++++++++ .../internal/test/jetstream_mocks.go | 31 +++++ .../internal/test/nats_fixtures.go | 39 ++++++ 8 files changed, 376 insertions(+), 28 deletions(-) create mode 100644 backend/transcoder-worker/internal/service/chunk_kv_unit_test.go diff --git a/backend/transcoder-worker/cmd/main.go b/backend/transcoder-worker/cmd/main.go index a817908..cba128e 100644 --- a/backend/transcoder-worker/cmd/main.go +++ b/backend/transcoder-worker/cmd/main.go @@ -64,7 +64,7 @@ func main() { }) if err != nil { logger.Error("failed to ccreate transcode-chunk-job-processed kv bucket", "err", err) - os.Exit(1) + osExit(1) } quit := make(chan os.Signal, 1) diff --git a/backend/transcoder-worker/cmd/main_integration_test.go b/backend/transcoder-worker/cmd/main_integration_test.go index b29b8b2..f4d568f 100644 --- a/backend/transcoder-worker/cmd/main_integration_test.go +++ b/backend/transcoder-worker/cmd/main_integration_test.go @@ -36,11 +36,12 @@ func TestMain(m *testing.M) { func TestRunProcessingI(t *testing.T) { t.Run("quit signal exits cleanly", func(t *testing.T) { js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) quit := make(chan os.Signal, 1) done := make(chan error, 1) go func() { - done <- runProcessing(sharedFilerURL, js, nc, test.SilentLogger(), quit) + done <- runProcessing(sharedFilerURL, js, nc, kv, test.SilentLogger(), quit) }() time.Sleep(200 * time.Millisecond) @@ -60,6 +61,7 @@ func TestRunProcessingI(t *testing.T) { } js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) jobID := "job-full-flow" t.Cleanup(func() { @@ -82,7 +84,7 @@ func TestRunProcessingI(t *testing.T) { done := make(chan error, 1) go func() { - done <- runProcessing(sharedFilerURL, js, nc, test.SilentLogger(), quit) + done <- runProcessing(sharedFilerURL, js, nc, kv, test.SilentLogger(), quit) }() time.Sleep(500 * time.Millisecond) @@ -138,7 +140,20 @@ func TestRunProcessingI(t *testing.T) { require.NoError(t, err) quit := make(chan os.Signal, 1) - err = runProcessing(sharedFilerURL, js, nc, test.SilentLogger(), quit) + err = runProcessing(sharedFilerURL, js, nc, &test.MockKV{}, test.SilentLogger(), quit) + + assert.Error(t, err) + }) +} + +func TestKVSetup(t *testing.T) { + t.Run("CreateOrUpdateKeyValue fails when JetStream is not enabled", func(t *testing.T) { + nc := test.SetupNatsNoJetStream(t) + + js, err := jetstream.New(nc) + require.NoError(t, err) + + _, err = js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{Bucket: "transcode-chunk-job-processed"}) assert.Error(t, err) }) diff --git a/backend/transcoder-worker/cmd/main_unit_test.go b/backend/transcoder-worker/cmd/main_unit_test.go index 805f79a..23cad2d 100644 --- a/backend/transcoder-worker/cmd/main_unit_test.go +++ b/backend/transcoder-worker/cmd/main_unit_test.go @@ -20,6 +20,10 @@ func okJS() *test.MockJS { return &test.MockJS{JStream: &test.MockStream{Cons: &test.MockConsumer{}}} } +func okKV() *test.MockKV { + return &test.MockKV{} +} + func TestNewLogger(t *testing.T) { t.Run("dev mode enables debug level", func(t *testing.T) { logger := newLogger(&Config{ProdMode: false}) @@ -41,7 +45,7 @@ func TestRunProcessing(t *testing.T) { nc := &test.MockDrainer{} quit := make(chan os.Signal, 1) - err := runProcessing("http://storage", js, nc, test.SilentLogger(), quit) + err := runProcessing("http://storage", js, nc, okKV(), test.SilentLogger(), quit) require.ErrorIs(t, err, assert.AnError) assert.False(t, nc.DrainCalled, "Drain should not be called if consumer setup fails") @@ -52,7 +56,7 @@ func TestRunProcessing(t *testing.T) { done := make(chan error, 1) go func() { - done <- runProcessing("http://storage", okJS(), &test.MockDrainer{}, test.SilentLogger(), quit) + done <- runProcessing("http://storage", okJS(), &test.MockDrainer{}, okKV(), test.SilentLogger(), quit) }() select { @@ -77,7 +81,7 @@ func TestRunProcessing(t *testing.T) { quit := make(chan os.Signal, 1) quit <- os.Interrupt - require.NoError(t, runProcessing("http://storage", js, &test.MockDrainer{}, test.SilentLogger(), quit)) + require.NoError(t, runProcessing("http://storage", js, &test.MockDrainer{}, okKV(), test.SilentLogger(), quit)) require.NotNil(t, consumer.Ctx) assert.True(t, consumer.Ctx.Stopped) @@ -88,7 +92,7 @@ func TestRunProcessing(t *testing.T) { quit := make(chan os.Signal, 1) quit <- os.Interrupt - require.NoError(t, runProcessing("http://storage", okJS(), nc, test.SilentLogger(), quit)) + require.NoError(t, runProcessing("http://storage", okJS(), nc, okKV(), test.SilentLogger(), quit)) assert.True(t, nc.DrainCalled) }) @@ -98,7 +102,7 @@ func TestRunProcessing(t *testing.T) { quit := make(chan os.Signal, 1) quit <- os.Interrupt - err := runProcessing("http://storage", okJS(), nc, test.SilentLogger(), quit) + err := runProcessing("http://storage", okJS(), nc, okKV(), test.SilentLogger(), quit) assert.ErrorIs(t, err, assert.AnError) }) diff --git a/backend/transcoder-worker/internal/handler/subscriber_integration_test.go b/backend/transcoder-worker/internal/handler/subscriber_integration_test.go index edeec0c..950a470 100644 --- a/backend/transcoder-worker/internal/handler/subscriber_integration_test.go +++ b/backend/transcoder-worker/internal/handler/subscriber_integration_test.go @@ -48,16 +48,18 @@ func TestConsumeVideoChunk(t *testing.T) { js, err := jetstream.New(nc) require.NoError(t, err) + kv := test.SetupKV(t, js) - _, err = ConsumeVideoChunk(sharedFilerURL, js, test.SilentLogger()) + _, err = ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) assert.Error(t, err) }) t.Run("returns non-nil consume context", func(t *testing.T) { js, _ := test.SetupNats(t) + kv := test.SetupKV(t, js) - consCtx, err := ConsumeVideoChunk(sharedFilerURL, js, test.SilentLogger()) + consCtx, err := ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) require.NoError(t, err) assert.NotNil(t, consCtx) @@ -66,8 +68,9 @@ func TestConsumeVideoChunk(t *testing.T) { t.Run("consumer is created with correct config", func(t *testing.T) { ctx := context.Background() js, _ := test.SetupNats(t) + kv := test.SetupKV(t, js) - _, err := ConsumeVideoChunk(sharedFilerURL, js, test.SilentLogger()) + _, err := ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) require.NoError(t, err) stream, err := js.Stream(ctx, "jobs") @@ -87,8 +90,9 @@ func TestConsumeVideoChunk(t *testing.T) { t.Run("invalid JSON does not publish downstream", func(t *testing.T) { js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) - _, err := ConsumeVideoChunk(sharedFilerURL, js, test.SilentLogger()) + _, err := ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) require.NoError(t, err) received := make(chan struct{}, 1) @@ -110,6 +114,7 @@ func TestConsumeVideoChunk(t *testing.T) { t.Run("valid message publishes chunk complete and acks", func(t *testing.T) { js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) jobID := "job-full-flow" t.Cleanup(func() { @@ -126,7 +131,7 @@ func TestConsumeVideoChunk(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { _ = sub.Unsubscribe() }) - _, err = ConsumeVideoChunk(sharedFilerURL, js, test.SilentLogger()) + _, err = ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) require.NoError(t, err) test.PublishVideoChunk(t, js, service.VideoChunkMessage{ @@ -176,6 +181,7 @@ func TestConsumeVideoChunkNaksOnError(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { js, _ := test.SetupNats(t) + kv := test.SetupKV(t, js) jobID := "job-nak-" + tc.fileName t.Cleanup(func() { os.RemoveAll("/tmp/temp-unprocessed-" + jobID) @@ -184,7 +190,7 @@ func TestConsumeVideoChunkNaksOnError(t *testing.T) { storageURL := test.SeedUnprocessedVideo(t, sharedFilerURL, jobID, tc.fileName, tc.videoContent(t)) - _, err := ConsumeVideoChunk(tc.baseStorageURL, js, test.SilentLogger()) + _, err := ConsumeVideoChunk(tc.baseStorageURL, js, kv, test.SilentLogger()) require.NoError(t, err) test.PublishVideoChunk(t, js, service.VideoChunkMessage{ @@ -221,6 +227,8 @@ func TestConsumeVideoChunkPublishFails(t *testing.T) { }) require.NoError(t, err) + kv := test.SetupKV(t, js) + jobID := "job-publish-fail" t.Cleanup(func() { os.RemoveAll("/tmp/temp-unprocessed-" + jobID) @@ -231,7 +239,7 @@ func TestConsumeVideoChunkPublishFails(t *testing.T) { require.NoError(t, err) storageURL := test.SeedUnprocessedVideo(t, sharedFilerURL, jobID, "test_video.mp4", videoContent) - _, err = ConsumeVideoChunk(sharedFilerURL, js, test.SilentLogger()) + _, err = ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) require.NoError(t, err) test.PublishVideoChunk(t, js, service.VideoChunkMessage{ @@ -247,12 +255,13 @@ func TestConsumeVideoChunkCleanup(t *testing.T) { seedAndConsume := func(t *testing.T, jobID string) (jetstream.JetStream, <-chan struct{}) { t.Helper() js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) videoContent, err := os.ReadFile("../test/test_video.mp4") require.NoError(t, err) storageURL := test.SeedUnprocessedVideo(t, sharedFilerURL, jobID, "test_video.mp4", videoContent) - _, err = ConsumeVideoChunk(sharedFilerURL, js, test.SilentLogger()) + _, err = ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) require.NoError(t, err) received := make(chan struct{}, 1) @@ -306,3 +315,91 @@ func TestConsumeVideoChunkCleanup(t *testing.T) { }) } } + +func TestConsumeVideoChunkIdempotency(t *testing.T) { + t.Run("already processed chunk is acked and skipped", func(t *testing.T) { + js, nc := test.SetupNats(t) + kv := test.SetupKV(t, js) + + jobID := "job-idempotency-skip" + + // Pre-seed the KV as if this chunk was already processed. + _, err := kv.Put(context.Background(), fmt.Sprintf("%s.%d", jobID, 0), []byte("processed")) + require.NoError(t, err) + + received := make(chan struct{}, 1) + sub, err := nc.Subscribe("jobs.chunks.complete", func(_ *nats.Msg) { received <- struct{}{} }) + require.NoError(t, err) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + _, err = ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) + require.NoError(t, err) + + test.PublishVideoChunk(t, js, service.VideoChunkMessage{ + JobID: jobID, ChunkIndex: 0, TotalChunks: 1, + StorageURL: "http://storage/fake", TargetResolution: "480p", + }) + + select { + case <-received: + t.Fatal("already processed chunk triggered a downstream publish") + case <-time.After(2 * time.Second): + } + }) + + t.Run("kv entry is written after successful processing", func(t *testing.T) { + js, _ := test.SetupNats(t) + kv := test.SetupKV(t, js) + + jobID := "job-idempotency-write" + t.Cleanup(func() { + os.RemoveAll("/tmp/temp-unprocessed-" + jobID) + os.RemoveAll("/tmp/temp-processed-" + jobID) + }) + + videoContent, err := os.ReadFile("../test/test_video.mp4") + require.NoError(t, err) + storageURL := test.SeedUnprocessedVideo(t, sharedFilerURL, jobID, "test_video.mp4", videoContent) + + _, err = ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) + require.NoError(t, err) + + test.PublishVideoChunk(t, js, service.VideoChunkMessage{ + JobID: jobID, ChunkIndex: 0, TotalChunks: 1, + StorageURL: storageURL, TargetResolution: "480p", + }) + + // Wait for processing to complete then verify KV entry exists. + require.Eventually(t, func() bool { + _, err := kv.Get(context.Background(), fmt.Sprintf("%s.%d", jobID, 0)) + return err == nil + }, 30*time.Second, 200*time.Millisecond, "kv entry for processed chunk was never written") + }) + + t.Run("kv entry is not written when processing fails", func(t *testing.T) { + js, _ := test.SetupNats(t) + kv := test.SetupKV(t, js) + + jobID := "job-idempotency-no-write-on-fail" + t.Cleanup(func() { + os.RemoveAll("/tmp/temp-unprocessed-" + jobID) + os.RemoveAll("/tmp/temp-processed-" + jobID) + }) + + // Seed invalid video so transcoding fails. + storageURL := test.SeedUnprocessedVideo(t, sharedFilerURL, jobID, "bad.mp4", []byte("not a video")) + + _, err := ConsumeVideoChunk(sharedFilerURL, js, kv, test.SilentLogger()) + require.NoError(t, err) + + test.PublishVideoChunk(t, js, service.VideoChunkMessage{ + JobID: jobID, ChunkIndex: 0, TotalChunks: 1, + StorageURL: storageURL, TargetResolution: "480p", + }) + + time.Sleep(2 * time.Second) + + _, err = kv.Get(context.Background(), fmt.Sprintf("%s.%d", jobID, 0)) + assert.ErrorIs(t, err, jetstream.ErrKeyNotFound, "kv entry should not exist after failed processing") + }) +} diff --git a/backend/transcoder-worker/internal/handler/subscriber_unit_test.go b/backend/transcoder-worker/internal/handler/subscriber_unit_test.go index 00a4431..05461fc 100644 --- a/backend/transcoder-worker/internal/handler/subscriber_unit_test.go +++ b/backend/transcoder-worker/internal/handler/subscriber_unit_test.go @@ -5,6 +5,7 @@ package handler_test import ( "encoding/json" "errors" + "fmt" "testing" "transcoder-worker/internal/handler" "transcoder-worker/internal/service" @@ -30,6 +31,18 @@ func (m *mockMsg) Data() []byte { return m.data } func (m *mockMsg) Nak() error { m.nakCalled = true; return m.nakErr } func (m *mockMsg) Ack() error { m.ackCalled = true; return nil } +func validPayload(t *testing.T, jobID string) []byte { + t.Helper() + data, err := json.Marshal(service.VideoChunkMessage{ + JobID: jobID, + ChunkIndex: 0, + StorageURL: "http://localhost:1/job-1/chunk.mp4", + TargetResolution: "720p", + }) + require.NoError(t, err) + return data +} + func TestReturnError(t *testing.T) { streamNameErr := errors.New("no stream") streamErr := errors.New("stream error") @@ -65,7 +78,7 @@ func TestReturnError(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - _, err := handler.ConsumeVideoChunk("http://storage", tc.js, test.SilentLogger()) + _, err := handler.ConsumeVideoChunk("http://storage", tc.js, &test.MockKV{}, test.SilentLogger()) require.Error(t, err) assert.ErrorIs(t, err, tc.wantErr) @@ -79,7 +92,7 @@ func TestAckAndNacking(t *testing.T) { consumer := &test.MockConsumerWithMsg{Msg: msg} js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} - consCtx, err := handler.ConsumeVideoChunk("http://storage", js, test.SilentLogger()) + consCtx, err := handler.ConsumeVideoChunk("http://storage", js, &test.MockKV{}, test.SilentLogger()) require.NoError(t, err) assert.NotNil(t, consCtx) @@ -93,7 +106,7 @@ func TestAckAndNacking(t *testing.T) { consumer := &test.MockConsumerWithMsg{Msg: msg} js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} - consCtx, err := handler.ConsumeVideoChunk("http://storage", js, test.SilentLogger()) + consCtx, err := handler.ConsumeVideoChunk("http://storage", js, &test.MockKV{}, test.SilentLogger()) require.NoError(t, err) assert.NotNil(t, consCtx) @@ -101,22 +114,80 @@ func TestAckAndNacking(t *testing.T) { }) t.Run("fetch failure does not nak or ack", func(t *testing.T) { - payload, err := json.Marshal(service.VideoChunkMessage{ - JobID: "job-1", - ChunkIndex: 0, - StorageURL: "http://localhost:1/job-1/chunk.mp4", - TargetResolution: "720p", - }) + msg := &mockMsg{data: validPayload(t, "job-1")} + consumer := &test.MockConsumerWithMsg{Msg: msg} + js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + + _, err := handler.ConsumeVideoChunk("http://storage", js, &test.MockKV{}, test.SilentLogger()) + require.NoError(t, err) + assert.False(t, msg.nakCalled) + assert.False(t, msg.ackCalled) + }) +} - msg := &mockMsg{data: payload} +func TestIdempotency(t *testing.T) { + t.Run("already processed chunk acks and skips processing", func(t *testing.T) { + msg := &mockMsg{data: validPayload(t, "job-1")} consumer := &test.MockConsumerWithMsg{Msg: msg} js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{GetFound: true} - _, err = handler.ConsumeVideoChunk("http://storage", js, test.SilentLogger()) + _, err := handler.ConsumeVideoChunk("http://storage", js, kv, test.SilentLogger()) require.NoError(t, err) + assert.True(t, msg.ackCalled) assert.False(t, msg.nakCalled) + }) + + t.Run("already processed chunk does not write to kv again", func(t *testing.T) { + msg := &mockMsg{data: validPayload(t, "job-1")} + consumer := &test.MockConsumerWithMsg{Msg: msg} + js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{GetFound: true} + + _, err := handler.ConsumeVideoChunk("http://storage", js, kv, test.SilentLogger()) + + require.NoError(t, err) + assert.Empty(t, kv.PutKey) + }) + + t.Run("kv check error does not ack or nak", func(t *testing.T) { + msg := &mockMsg{data: validPayload(t, "job-1")} + consumer := &test.MockConsumerWithMsg{Msg: msg} + js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{GetErr: errors.New("kv unavailable")} + + _, err := handler.ConsumeVideoChunk("http://storage", js, kv, test.SilentLogger()) + + require.NoError(t, err) assert.False(t, msg.ackCalled) + assert.False(t, msg.nakCalled) + }) + + t.Run("writes kv with correct key on success", func(t *testing.T) { + payload, err := json.Marshal(service.VideoChunkMessage{ + JobID: "job-abc", + ChunkIndex: 2, + StorageURL: "http://localhost:1/job-abc/chunk.mp4", + TargetResolution: "480p", + }) + require.NoError(t, err) + + msg := &mockMsg{data: payload} + consumer := &test.MockConsumerWithMsg{Msg: msg} + js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}} + kv := &test.MockKV{} + + _, _ = handler.ConsumeVideoChunk("http://localhost:1", js, kv, test.SilentLogger()) + + assert.Empty(t, kv.PutKey, "kv.Put should not be called when processing fails") + }) + + t.Run("kv key format is job_id.chunk_index", func(t *testing.T) { + jobID := "abc-123" + chunkIndex := 3 + expected := fmt.Sprintf("%s.%d", jobID, chunkIndex) + assert.Equal(t, "abc-123.3", expected) }) } diff --git a/backend/transcoder-worker/internal/service/chunk_kv_unit_test.go b/backend/transcoder-worker/internal/service/chunk_kv_unit_test.go new file mode 100644 index 0000000..90d4c76 --- /dev/null +++ b/backend/transcoder-worker/internal/service/chunk_kv_unit_test.go @@ -0,0 +1,91 @@ +//go:build unit + +package service_test + +import ( + "errors" + "testing" + "transcoder-worker/internal/service" + "transcoder-worker/internal/test" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCheckChunkProcessed(t *testing.T) { + t.Run("returns false when key not found", func(t *testing.T) { + kv := &test.MockKV{GetFound: false} + + processed, err := service.CheckChunkProcessed(kv, "job-1", 0) + + require.NoError(t, err) + assert.False(t, processed) + }) + + t.Run("returns true when key exists", func(t *testing.T) { + kv := &test.MockKV{GetFound: true} + + processed, err := service.CheckChunkProcessed(kv, "job-1", 0) + + require.NoError(t, err) + assert.True(t, processed) + }) + + t.Run("returns error on unexpected kv failure", func(t *testing.T) { + kv := &test.MockKV{GetErr: errors.New("kv unavailable")} + + _, err := service.CheckChunkProcessed(kv, "job-1", 0) + + require.Error(t, err) + assert.ErrorContains(t, err, "failed") + }) + + t.Run("does not return error for ErrKeyNotFound", func(t *testing.T) { + kv := &test.MockKV{GetErr: jetstream.ErrKeyNotFound} + + processed, err := service.CheckChunkProcessed(kv, "job-1", 0) + + require.NoError(t, err) + assert.False(t, processed) + }) + + t.Run("uses correct key format job_id.chunk_index", func(t *testing.T) { + // Key lookup for job "abc" chunk 3 must use "abc.3". + // We verify by having GetFound=true and confirming no error path is hit. + kv := &test.MockKV{GetFound: true} + + processed, err := service.CheckChunkProcessed(kv, "abc", 3) + + require.NoError(t, err) + assert.True(t, processed) + }) +} + +func TestAddChunkProcessed(t *testing.T) { + t.Run("returns nil on success", func(t *testing.T) { + kv := &test.MockKV{} + + err := service.AddChunkProcessed(kv, "job-1", 0) + + require.NoError(t, err) + }) + + t.Run("writes correct key job_id.chunk_index", func(t *testing.T) { + kv := &test.MockKV{} + + err := service.AddChunkProcessed(kv, "job-abc", 2) + + require.NoError(t, err) + assert.Equal(t, "job-abc.2", kv.PutKey) + }) + + t.Run("returns error on kv failure", func(t *testing.T) { + kv := &test.MockKV{PutErr: errors.New("put failed")} + + err := service.AddChunkProcessed(kv, "job-1", 0) + + require.Error(t, err) + assert.ErrorContains(t, err, "failed") + }) +} diff --git a/backend/transcoder-worker/internal/test/jetstream_mocks.go b/backend/transcoder-worker/internal/test/jetstream_mocks.go index f3b5cf0..8ec56df 100644 --- a/backend/transcoder-worker/internal/test/jetstream_mocks.go +++ b/backend/transcoder-worker/internal/test/jetstream_mocks.go @@ -6,6 +6,37 @@ import ( "github.com/nats-io/nats.go/jetstream" ) +// MockKV stubs jetstream.KeyValue for unit tests. +type MockKV struct { + jetstream.KeyValue + GetErr error + GetFound bool // if true, Get returns a non-nil entry; if false, returns ErrKeyNotFound + PutErr error + PutKey string +} + +func (m *MockKV) Get(_ context.Context, key string) (jetstream.KeyValueEntry, error) { + if m.GetErr != nil { + return nil, m.GetErr + } + if !m.GetFound { + return nil, jetstream.ErrKeyNotFound + } + return &mockKVEntry{key: key}, nil +} + +func (m *MockKV) Put(_ context.Context, key string, _ []byte) (uint64, error) { + m.PutKey = key + return 0, m.PutErr +} + +type mockKVEntry struct { + jetstream.KeyValueEntry + key string +} + +func (e *mockKVEntry) Key() string { return e.key } + // MockJS stubs jetstream.JetStream. Set StreamNameErr to simulate a lookup failure. type MockJS struct { jetstream.JetStream diff --git a/backend/transcoder-worker/internal/test/nats_fixtures.go b/backend/transcoder-worker/internal/test/nats_fixtures.go index 64ac97a..1c55380 100644 --- a/backend/transcoder-worker/internal/test/nats_fixtures.go +++ b/backend/transcoder-worker/internal/test/nats_fixtures.go @@ -10,7 +10,9 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/stretchr/testify/require" + tc "github.com/testcontainers/testcontainers-go" natstc "github.com/testcontainers/testcontainers-go/modules/nats" + "github.com/testcontainers/testcontainers-go/wait" ) // fixture for setting up nats container for testing @@ -44,3 +46,40 @@ func SetupNats(t *testing.T) (jetstream.JetStream, *nats.Conn) { return js, nc } + +// starts a plain NATS container without JetStream enabled and returns the connection. +func SetupNatsNoJetStream(t *testing.T) *nats.Conn { + t.Helper() + ctx := context.Background() + + container, err := tc.GenericContainer(ctx, tc.GenericContainerRequest{ + ContainerRequest: tc.ContainerRequest{ + Image: "nats:2.10-alpine", + ExposedPorts: []string{"4222/tcp"}, + WaitingFor: wait.ForLog("Server is ready"), + }, + Started: true, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = container.Terminate(ctx) }) + + host, err := container.Host(ctx) + require.NoError(t, err) + port, err := container.MappedPort(ctx, "4222") + require.NoError(t, err) + + nc, err := nats.Connect("nats://" + host + ":" + port.Port()) + require.NoError(t, err) + t.Cleanup(nc.Close) + + return nc +} + +func SetupKV(t *testing.T, js jetstream.JetStream) jetstream.KeyValue { + t.Helper() + kv, err := js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{ + Bucket: "chunk-processed", + }) + require.NoError(t, err) + return kv +} From 3468a8a5a3d82bdadc8f891ec27dae2acfb94c01 Mon Sep 17 00:00:00 2001 From: Vchen7629 Date: Sun, 12 Apr 2026 12:53:42 -0700 Subject: [PATCH 8/8] tests(backend): added idempotency pipeline tests for all 3 services --- .../pipeline-tests/e2e_integration_test.go | 104 +-------- .../idempotency_integration_test.go | 209 ++++++++++++++++++ 2 files changed, 211 insertions(+), 102 deletions(-) create mode 100644 backend/pipeline-tests/idempotency_integration_test.go diff --git a/backend/pipeline-tests/e2e_integration_test.go b/backend/pipeline-tests/e2e_integration_test.go index e576f5c..b311fc3 100644 --- a/backend/pipeline-tests/e2e_integration_test.go +++ b/backend/pipeline-tests/e2e_integration_test.go @@ -3,9 +3,6 @@ package e2e import ( - "context" - "encoding/json" - "fmt" "io" "net/http" "os" @@ -14,9 +11,6 @@ import ( "pipeline-tests/helpers" "testing" "time" - - "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -35,7 +29,7 @@ func TestMain(m *testing.M) { func TestPipelineHappyPath(t *testing.T) { baseURL, statusURL, _ := helpers.SetupPipeline(t, 1, sharedFilerURL) - + t.Run("multi-chunk video is transcoded to target resolution", func(t *testing.T) { videoPath := filepath.Join(t.TempDir(), "test.mp4") helpers.GenerateTestVideo(t, videoPath) @@ -90,98 +84,4 @@ func TestPipelineHappyPath(t *testing.T) { helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute) assert.Equal(t, "COMPLETE", helpers.PollJobStatus(t, statusURL, jobID)) }) -} - -func TestFaultTolerance(t *testing.T) { - t.Run("duplicate ChunkCompleteMessage does not trigger a second stitch", func(t *testing.T) { - baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL) - - videoPath := filepath.Join(t.TempDir(), "test.mp4") - helpers.GenerateTestVideo(t, videoPath) - - jobID := helpers.UploadVideo(t, baseURL, videoPath, "480p") - helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute) - - nc, err := nats.Connect(natsURL) - require.NoError(t, err) - t.Cleanup(nc.Close) - - js, err := jetstream.New(nc) - require.NoError(t, err) - - secondComplete := make(chan struct{}, 1) - sub, err := nc.Subscribe("jobs.complete", func(_ *nats.Msg) { - secondComplete <- struct{}{} - }) - require.NoError(t, err) - t.Cleanup(func() { _ = sub.Unsubscribe() }) - - // Re-publish a ChunkCompleteMessage for chunk 0 using its SeaweedFS storage URL. - chunkStorageURL := fmt.Sprintf("%s/%s/chunk_000/processed", sharedFilerURL, jobID) - payload, err := json.Marshal(struct { - JobID string `json:"job_id"` - ChunkIndex int `json:"chunk_index"` - TotalChunks int `json:"total_chunks"` - StorageURL string `json:"storage_url"` - }{ - JobID: jobID, - ChunkIndex: 0, - TotalChunks: 1, - StorageURL: chunkStorageURL, - }) - require.NoError(t, err) - _, err = js.Publish(context.Background(), "jobs.chunks.complete", payload) - require.NoError(t, err) - - select { - case <-secondComplete: - t.Fatal("duplicate ChunkCompleteMessage triggered a second stitch") - case <-time.After(5 * time.Second): - } - }) - - t.Run("redelivered SceneSplitMessage does not publish duplicate chunks", func(t *testing.T) { - baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL) - - videoPath := filepath.Join(t.TempDir(), "test.mp4") - helpers.GenerateTestVideo(t, videoPath) - - jobID := helpers.UploadVideo(t, baseURL, videoPath, "480p") - helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute) - - nc, err := nats.Connect(natsURL) - require.NoError(t, err) - t.Cleanup(nc.Close) - - js, err := jetstream.New(nc) - require.NoError(t, err) - - secondComplete := make(chan struct{}, 1) - sub, err := nc.Subscribe("jobs.complete", func(_ *nats.Msg) { - secondComplete <- struct{}{} - }) - require.NoError(t, err) - t.Cleanup(func() { _ = sub.Unsubscribe() }) - - // Re-publish the original SceneSplitMessage using its SeaweedFS storage URL. - videoStorageURL := fmt.Sprintf("%s/%s/test.mp4", sharedFilerURL, jobID) - payload, err := json.Marshal(struct { - JobID string `json:"job_id"` - TargetResolution string `json:"target_resolution"` - StorageURL string `json:"storage_url"` - }{ - JobID: jobID, - TargetResolution: "480p", - StorageURL: videoStorageURL, - }) - require.NoError(t, err) - _, err = js.Publish(context.Background(), "jobs.video.scene-split", payload) - require.NoError(t, err) - - select { - case <-secondComplete: - t.Fatal("redelivered SceneSplitMessage caused a second pipeline run") - case <-time.After(15 * time.Second): - } - }) -} +} \ No newline at end of file diff --git a/backend/pipeline-tests/idempotency_integration_test.go b/backend/pipeline-tests/idempotency_integration_test.go new file mode 100644 index 0000000..192f8a9 --- /dev/null +++ b/backend/pipeline-tests/idempotency_integration_test.go @@ -0,0 +1,209 @@ +//go:build integration + +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "path/filepath" + "pipeline-tests/helpers" + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/require" +) + +func TestTranscoderWorker(t *testing.T) { + t.Run("redelivered VideoChunkMessage does not publish duplicate chunk complete", func(t *testing.T) { + baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL) + + nc, err := nats.Connect(natsURL) + require.NoError(t, err) + t.Cleanup(nc.Close) + + js, err := jetstream.New(nc) + require.NoError(t, err) + + // Capture real VideoChunkMessages in flight before uploading so we have + // valid storage URLs to replay after the job completes. + capturedChunkMsgs := make(chan []byte, 10) + captureSub, err := nc.Subscribe("jobs.video.chunks", func(m *nats.Msg) { + capturedChunkMsgs <- m.Data + }) + require.NoError(t, err) + + videoPath := filepath.Join(t.TempDir(), "test.mp4") + helpers.GenerateTestVideo(t, videoPath) + + jobID := helpers.UploadVideo(t, baseURL, videoPath, "480p") + helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute) + require.NoError(t, captureSub.Unsubscribe()) + + // Now watch for any new jobs.chunks.complete after replaying the message. + duplicateComplete := make(chan struct{}, 1) + sub, err := nc.Subscribe("jobs.chunks.complete", func(_ *nats.Msg) { + duplicateComplete <- struct{}{} + }) + require.NoError(t, err) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + // Replay the first captured VideoChunkMessage — storage URLs are real and + // the file still exists in SeaweedFS, so the worker CAN process it. + // Idempotency is the only thing that should prevent a second publish. + payload := <-capturedChunkMsgs + _, err = js.Publish(context.Background(), "jobs.video.chunks", payload) + require.NoError(t, err) + + select { + case <-duplicateComplete: + t.Fatal("redelivered VideoChunkMessage caused a duplicate jobs.chunks.complete publish") + case <-time.After(5 * time.Second): + } + }) +} + +func TestVideoRecombiner(t *testing.T) { + t.Run("redelivered ChunkCompleteMessage does not trigger a second stitch", func(t *testing.T) { + baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL) + + nc, err := nats.Connect(natsURL) + require.NoError(t, err) + t.Cleanup(nc.Close) + + js, err := jetstream.New(nc) + require.NoError(t, err) + + // Capture real ChunkCompleteMessages in flight before uploading so we have + // valid storage URLs to replay after the job completes. + capturedChunkCompleteMsgs := make(chan []byte, 10) + captureSub, err := nc.Subscribe("jobs.chunks.complete", func(m *nats.Msg) { + capturedChunkCompleteMsgs <- m.Data + }) + require.NoError(t, err) + + videoPath := filepath.Join(t.TempDir(), "test.mp4") + helpers.GenerateTestVideo(t, videoPath) + + jobID := helpers.UploadVideo(t, baseURL, videoPath, "480p") + helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute) + require.NoError(t, captureSub.Unsubscribe()) + + // Now watch for any new jobs.complete after replaying the message. + secondComplete := make(chan struct{}, 1) + sub, err := nc.Subscribe("jobs.complete", func(_ *nats.Msg) { + secondComplete <- struct{}{} + }) + require.NoError(t, err) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + // Replay the first captured ChunkCompleteMessage — storage URLs are real and + // the file still exists in SeaweedFS, so the recombiner CAN stitch it. + // Idempotency is the only thing that should prevent a second jobs.complete publish. + payload := <-capturedChunkCompleteMsgs + _, err = js.Publish(context.Background(), "jobs.chunks.complete", payload) + require.NoError(t, err) + + select { + case <-secondComplete: + t.Fatal("redelivered ChunkCompleteMessage triggered a second stitch") + case <-time.After(5 * time.Second): + } + }) +} + +func TestSceneDetectorFault(t *testing.T) { + t.Run("duplicate ChunkCompleteMessage does not trigger a second stitch", func(t *testing.T) { + baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL) + + videoPath := filepath.Join(t.TempDir(), "test.mp4") + helpers.GenerateTestVideo(t, videoPath) + + jobID := helpers.UploadVideo(t, baseURL, videoPath, "480p") + helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute) + + nc, err := nats.Connect(natsURL) + require.NoError(t, err) + t.Cleanup(nc.Close) + + js, err := jetstream.New(nc) + require.NoError(t, err) + + secondComplete := make(chan struct{}, 1) + sub, err := nc.Subscribe("jobs.complete", func(_ *nats.Msg) { + secondComplete <- struct{}{} + }) + require.NoError(t, err) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + // Re-publish a ChunkCompleteMessage for chunk 0 using its SeaweedFS storage URL. + chunkStorageURL := fmt.Sprintf("%s/%s/chunk_000/processed", sharedFilerURL, jobID) + payload, err := json.Marshal(struct { + JobID string `json:"job_id"` + ChunkIndex int `json:"chunk_index"` + TotalChunks int `json:"total_chunks"` + StorageURL string `json:"storage_url"` + }{ + JobID: jobID, + ChunkIndex: 0, + TotalChunks: 1, + StorageURL: chunkStorageURL, + }) + require.NoError(t, err) + _, err = js.Publish(context.Background(), "jobs.chunks.complete", payload) + require.NoError(t, err) + + select { + case <-secondComplete: + t.Fatal("duplicate ChunkCompleteMessage triggered a second stitch") + case <-time.After(5 * time.Second): + } + }) + + t.Run("redelivered SceneSplitMessage does not publish duplicate chunks", func(t *testing.T) { + baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL) + + videoPath := filepath.Join(t.TempDir(), "test.mp4") + helpers.GenerateTestVideo(t, videoPath) + + jobID := helpers.UploadVideo(t, baseURL, videoPath, "480p") + helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute) + + nc, err := nats.Connect(natsURL) + require.NoError(t, err) + t.Cleanup(nc.Close) + + js, err := jetstream.New(nc) + require.NoError(t, err) + + secondComplete := make(chan struct{}, 1) + sub, err := nc.Subscribe("jobs.complete", func(_ *nats.Msg) { + secondComplete <- struct{}{} + }) + require.NoError(t, err) + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + // Re-publish the original SceneSplitMessage using its SeaweedFS storage URL. + videoStorageURL := fmt.Sprintf("%s/%s/test.mp4", sharedFilerURL, jobID) + payload, err := json.Marshal(struct { + JobID string `json:"job_id"` + TargetResolution string `json:"target_resolution"` + StorageURL string `json:"storage_url"` + }{ + JobID: jobID, + TargetResolution: "480p", + StorageURL: videoStorageURL, + }) + require.NoError(t, err) + _, err = js.Publish(context.Background(), "jobs.video.scene-split", payload) + require.NoError(t, err) + + select { + case <-secondComplete: + t.Fatal("redelivered SceneSplitMessage caused a second pipeline run") + case <-time.After(15 * time.Second): + } + }) +}