Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions backend/pipeline-tests/idempotency_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,50 @@ func TestTranscoderWorker(t *testing.T) {
}

func TestVideoRecombiner(t *testing.T) {
t.Run("duplicate chunk message in a multi-chunk job is dropped", func(t *testing.T) {
baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL)

nc, err := nats.Connect(natsURL)
require.NoError(t, err)
t.Cleanup(nc.Close)

js, err := jetstream.New(nc)
require.NoError(t, err)

// Capture ChunkCompleteMessages so we can replay one after the job completes.
capturedChunkMsgs := make(chan []byte, 10)
captureSub, err := nc.Subscribe("jobs.chunks.complete", func(m *nats.Msg) {
capturedChunkMsgs <- m.Data
})
require.NoError(t, err)

// GenerateTestVideo produces a red→blue cut so scene-detector emits 2+ chunks.
videoPath := filepath.Join(t.TempDir(), "test.mp4")
helpers.GenerateTestVideo(t, videoPath)

jobID := helpers.UploadVideo(t, baseURL, videoPath, "480p")
helpers.WaitForJobComplete(t, statusURL, jobID, 3*time.Minute)
require.NoError(t, captureSub.Unsubscribe())

secondComplete := make(chan struct{}, 1)
sub, err := nc.Subscribe("jobs.complete", func(_ *nats.Msg) {
secondComplete <- struct{}{}
})
require.NoError(t, err)
t.Cleanup(func() { _ = sub.Unsubscribe() })

// Replay one captured chunk — idempotency should ack and drop it.
payload := <-capturedChunkMsgs
_, err = js.Publish(context.Background(), "jobs.chunks.complete", payload)
require.NoError(t, err)

select {
case <-secondComplete:
t.Fatal("duplicate chunk message was not dropped: triggered a jobs.complete")
case <-time.After(5 * time.Second):
}
})

t.Run("redelivered ChunkCompleteMessage does not trigger a second stitch", func(t *testing.T) {
baseURL, statusURL, natsURL := helpers.SetupPipeline(t, 1, sharedFilerURL)

Expand Down
5 changes: 5 additions & 0 deletions backend/transcoder-worker/internal/handler/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func ConsumeVideoChunk(
filePath, err := storage.GetUnprocessedVideoChunk(payload.StorageURL, payload.JobID)
if err != nil {
logger.Error("error fetching unprocessed video chunk", "job_id", payload.JobID, "err", err)
err := msg.Nak()
if err != nil {
logger.Error("error naking msg for get unprocessed video chunk", "err", err)
return
}
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,15 @@ func TestAckAndNacking(t *testing.T) {
assert.True(t, msg.nakCalled)
})

t.Run("fetch failure does not nak or ack", func(t *testing.T) {
t.Run("fetch failure naks", func(t *testing.T) {
msg := &mockMsg{data: validPayload(t, "job-1")}
consumer := &test.MockConsumerWithMsg{Msg: msg}
js := &test.MockJS{JStream: &test.MockStream{Cons: consumer}}

_, err := handler.ConsumeVideoChunk("http://storage", js, &test.MockKV{}, test.SilentLogger())

require.NoError(t, err)
assert.False(t, msg.nakCalled)
assert.False(t, msg.ackCalled)
assert.True(t, msg.nakCalled)
})
}

Expand Down
10 changes: 10 additions & 0 deletions backend/video-recombiner/cmd/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"testing"
"video-recombiner/internal/test"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -33,3 +34,12 @@ func writeEnvFile(t *testing.T, content string) {
require.NoError(t, os.WriteFile(path, []byte(content), 0600))
t.Cleanup(func() { _ = os.Remove(path) })
}

// okJS returns a mock JetStream that succeeds through the full consumer setup.
func okJS() *test.MockJS {
return &test.MockJS{JStream: &test.MockStream{Cons: &test.MockConsumer{}}}
}

func okKV() *test.MockKV {
return &test.MockKV{}
}
38 changes: 24 additions & 14 deletions backend/video-recombiner/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package main

import (
"context"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"video-recombiner/internal/handler"
"video-recombiner/internal/observability"
"video-recombiner/internal/storage"

"github.com/joho/godotenv"
Expand All @@ -30,7 +33,7 @@ func main() {
log.Fatalf("failed to load config values: %v", err)
}

logger := newLogger(cfg)
logger := observability.StructuredLogger(cfg.ProdMode)

err = storage.CheckHealth(cfg.BaseStorageURL, logger)
if err != nil {
Expand All @@ -53,10 +56,20 @@ func main() {
return
}

kv, err := js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{
Bucket: "recombine-chunk-recieved",
Description: "tracks video chunk for the jobID is already recieved for idempotency",
TTL: 3 * time.Hour,
})
if err != nil {
logger.Error("failed to create recombine-chunk-recieved kv bucket", "err", err)
osExit(1)
}

quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

err = runCombiner(js, nc, logger, cfg.BaseStorageURL, quit)
err = runCombiner(js, nc, kv, logger, cfg.BaseStorageURL, quit)
if err != nil {
logger.Error("error flushing remaining msgs", "err", err)
}
Expand All @@ -66,10 +79,17 @@ type ncDrainer interface {
Drain() error
}

func runCombiner(js jetstream.JetStream, nc ncDrainer, logger *slog.Logger, baseStorageURL string, quit <-chan os.Signal) error {
func runCombiner(
js jetstream.JetStream,
nc ncDrainer,
kv jetstream.KeyValue,
logger *slog.Logger,
baseStorageURL string,
quit <-chan os.Signal,
) error {
logger.Debug("starting service...")

consCtx, err := handler.RecombineVideo(js, logger, baseStorageURL)
consCtx, err := handler.RecombineVideo(js, kv, logger, baseStorageURL)
if err != nil {
return fmt.Errorf("failed to start subscriber/publisher: %w", err)
}
Expand All @@ -80,16 +100,6 @@ func runCombiner(js jetstream.JetStream, nc ncDrainer, logger *slog.Logger, base
return nc.Drain()
}

func newLogger(cfg *Config) *slog.Logger {
level := slog.LevelDebug
if cfg.ProdMode {
level = slog.LevelInfo
}
h := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level})

return slog.New(h).With("service", "video-recombiner")
}

func loadConfig() (*Config, error) {
err := godotenv.Load("../.env")
if err != nil {
Expand Down
28 changes: 22 additions & 6 deletions backend/video-recombiner/cmd/main_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ func TestMain(m *testing.M) {
func TestRunCombinerI(t *testing.T) {
t.Run("quit signal exits cleanly", func(t *testing.T) {
js, nc := test.SetupNats(t)
kv := test.SetupKV(t, js)
quit := make(chan os.Signal, 1)
done := make(chan error, 1)

go func() {
done <- runCombiner(js, nc, test.SilentLogger(), sharedFilerURL, quit)
done <- runCombiner(js, nc, kv, test.SilentLogger(), sharedFilerURL, quit)
}()

time.Sleep(200 * time.Millisecond)
Expand Down Expand Up @@ -72,16 +73,20 @@ func TestRunCombinerI(t *testing.T) {
require.NoError(t, err)

quit := make(chan os.Signal, 1)
err = runCombiner(js, nc, test.SilentLogger(), sharedFilerURL, quit)
err = runCombiner(js, nc, nil, test.SilentLogger(), sharedFilerURL, quit)

assert.Error(t, err)
})

t.Run("full flow: receive chunks, combine, publish downstream", func(t *testing.T) {
if _, err := exec.LookPath("ffmpeg"); err != nil {
_, err := exec.LookPath("ffmpeg")
if err != nil {
t.Skip("ffmpeg not available")
}

js, nc := test.SetupNats(t)
kv := test.SetupKV(t, js)

jobID := "job-full-flow"
t.Cleanup(func() {
os.RemoveAll("/tmp/processed_chunk-" + jobID)
Expand All @@ -94,8 +99,6 @@ func TestRunCombinerI(t *testing.T) {
test.SeedProcessedVideo(t, sharedFilerURL, jobID, "chunk-0.mp4", videoData)
test.SeedProcessedVideo(t, sharedFilerURL, jobID, "chunk-1.mp4", videoData)

js, nc := test.SetupNats(t)

received := make(chan []byte, 1)
sub, err := nc.Subscribe("jobs.complete", func(msg *nats.Msg) {
received <- msg.Data
Expand All @@ -107,7 +110,7 @@ func TestRunCombinerI(t *testing.T) {
done := make(chan error, 1)

go func() {
done <- runCombiner(js, nc, test.SilentLogger(), sharedFilerURL, quit)
done <- runCombiner(js, nc, kv, test.SilentLogger(), sharedFilerURL, quit)
}()

time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -146,6 +149,19 @@ func TestRunCombinerI(t *testing.T) {
})
}

func TestKVSetup(t *testing.T) {
t.Run("CreateOrUpdateKeyValue fails when JetStream is not enabled", func(t *testing.T) {
nc := test.SetupNatsNoJetStream(t)

js, err := jetstream.New(nc)
require.NoError(t, err)

_, err = js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{Bucket: "recombine-chunk-recieved"})

assert.Error(t, err)
})
}

func TestMainI(t *testing.T) {
t.Run("storage unreachable exits with code 1", func(t *testing.T) {
code := patchExit(t)
Expand Down
32 changes: 5 additions & 27 deletions backend/video-recombiner/cmd/main_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package main

import (
"context"
"log/slog"
"os"
"path/filepath"
"testing"
Expand All @@ -15,33 +13,13 @@ import (
"github.com/stretchr/testify/require"
)

// okJS returns a mock JetStream that succeeds through the full consumer setup.
func okJS() *test.MockJS {
return &test.MockJS{JStream: &test.MockStream{Cons: &test.MockConsumer{}}}
}

func TestStructuredLogger(t *testing.T) {
t.Run("dev mode should enable debug level", func(t *testing.T) {
logger := newLogger(&Config{ProdMode: false})

assert.True(t, logger.Enabled(context.Background(), slog.LevelDebug))
})

t.Run("prod mode should disable debug level", func(t *testing.T) {
logger := newLogger(&Config{ProdMode: true})

assert.False(t, logger.Enabled(context.Background(), slog.LevelDebug))
assert.True(t, logger.Enabled(context.Background(), slog.LevelInfo))
})
}

func TestRunCombiner(t *testing.T) {
t.Run("consume video chunk error should return error", func(t *testing.T) {
js := &test.MockJS{JStreamNameErr: assert.AnError}
nc := &test.MockDrainer{}
quit := make(chan os.Signal, 1)

err := runCombiner(js, nc, test.SilentLogger(), "http://storage", quit)
err := runCombiner(js, nc, okKV(), test.SilentLogger(), "http://storage", quit)

require.ErrorIs(t, err, assert.AnError)
assert.False(t, nc.DrainCalled, "Drain should not be called if consumer setup fails")
Expand All @@ -52,7 +30,7 @@ func TestRunCombiner(t *testing.T) {
done := make(chan error, 1)

go func() {
done <- runCombiner(okJS(), &test.MockDrainer{}, test.SilentLogger(), "http://storage", quit)
done <- runCombiner(okJS(), &test.MockDrainer{}, okKV(), test.SilentLogger(), "http://storage", quit)
}()

select {
Expand All @@ -77,7 +55,7 @@ func TestRunCombiner(t *testing.T) {
quit := make(chan os.Signal, 1)
quit <- os.Interrupt

require.NoError(t, runCombiner(js, &test.MockDrainer{}, test.SilentLogger(), "http://storage", quit))
require.NoError(t, runCombiner(js, &test.MockDrainer{}, okKV(), test.SilentLogger(), "http://storage", quit))

require.NotNil(t, consumer.Ctx)
assert.True(t, consumer.Ctx.Stopped)
Expand All @@ -88,7 +66,7 @@ func TestRunCombiner(t *testing.T) {
quit := make(chan os.Signal, 1)
quit <- os.Interrupt

require.NoError(t, runCombiner(okJS(), nc, test.SilentLogger(), "http://storage", quit))
require.NoError(t, runCombiner(okJS(), nc, okKV(), test.SilentLogger(), "http://storage", quit))

assert.True(t, nc.DrainCalled)
})
Expand All @@ -98,7 +76,7 @@ func TestRunCombiner(t *testing.T) {
quit := make(chan os.Signal, 1)
quit <- os.Interrupt

err := runCombiner(okJS(), nc, test.SilentLogger(), "http://storage", quit)
err := runCombiner(okJS(), nc, okKV(), test.SilentLogger(), "http://storage", quit)

assert.ErrorIs(t, err, assert.AnError)
})
Expand Down
5 changes: 2 additions & 3 deletions backend/video-recombiner/cmd/makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
test_all: integration unit

PKGS := . ../internal/handler/... ../internal/service/... ../internal/storage/...
PKGS := . ../internal/handler/... ../internal/observability/... ../internal/service/... ../internal/storage/...

format:
go fmt ${PKGS} .
Expand All @@ -17,6 +17,5 @@ integration:
go test -tags integration ${PKGS}

unit:
go test -tags unit .
go test -tags unit ../internal/handler/...
go test -tags unit ${PKGS}
go test -race -tags unit ../internal/service/...
Loading
Loading