diff --git a/js/hang/src/catalog/audio.ts b/js/hang/src/catalog/audio.ts index e57710bf2..dee81c3eb 100644 --- a/js/hang/src/catalog/audio.ts +++ b/js/hang/src/catalog/audio.ts @@ -15,7 +15,7 @@ export const AudioConfigSchema = z.object({ codec: z.string(), // Container format for timestamp encoding - // Defaults to "legacy" when not specified in catalog (backward compatibility) + // Defaults to "native" when not specified in catalog (backward compatibility) container: ContainerSchema.default(DEFAULT_CONTAINER), // The description is used for some codecs. @@ -32,6 +32,12 @@ export const AudioConfigSchema = z.object({ // The bitrate of the audio in bits per second // TODO: Support up to Number.MAX_SAFE_INTEGER bitrate: u53Schema.optional(), + + // Init segment (ftyp+moov) for CMAF/fMP4 containers. + // This is the initialization segment needed for MSE playback. + // Stored as base64-encoded bytes. If not provided, init segments + // will be sent over the data track (legacy behavior). + initSegment: z.string().optional(), // base64-encoded }); export const AudioSchema = z diff --git a/js/hang/src/catalog/container.ts b/js/hang/src/catalog/container.ts index 05b0e81db..6ec563e9f 100644 --- a/js/hang/src/catalog/container.ts +++ b/js/hang/src/catalog/container.ts @@ -3,16 +3,16 @@ import { z } from "zod"; /** * Container format for frame timestamp encoding. * - * - "legacy": Uses QUIC VarInt encoding (1-8 bytes, variable length) + * - "native": Uses QUIC VarInt encoding (1-8 bytes, variable length) * - "raw": Uses fixed u64 encoding (8 bytes, big-endian) - * - "fmp4": Fragmented MP4 container (future) + * - "cmaf": Fragmented MP4 container (future) */ -export const ContainerSchema = z.enum(["legacy", "raw", "fmp4"]); +export const ContainerSchema = z.enum(["native", "raw", "cmaf"]); export type Container = z.infer; /** * Default container format when not specified. - * Set to legacy for backward compatibility. + * Set to native for backward compatibility. */ -export const DEFAULT_CONTAINER: Container = "legacy"; +export const DEFAULT_CONTAINER: Container = "native"; diff --git a/js/hang/src/catalog/video.ts b/js/hang/src/catalog/video.ts index b8b77883f..fbb460de5 100644 --- a/js/hang/src/catalog/video.ts +++ b/js/hang/src/catalog/video.ts @@ -14,7 +14,7 @@ export const VideoConfigSchema = z.object({ codec: z.string(), // Container format for timestamp encoding - // Defaults to "legacy" when not specified in catalog (backward compatibility) + // Defaults to "native" when not specified in catalog (backward compatibility) container: ContainerSchema.default(DEFAULT_CONTAINER), // The description is used for some codecs. @@ -43,6 +43,12 @@ export const VideoConfigSchema = z.object({ // If true, the decoder will optimize for latency. // Default: true optimizeForLatency: z.boolean().optional(), + + // Init segment (ftyp+moov) for CMAF/fMP4 containers. + // This is the initialization segment needed for MSE playback. + // Stored as base64-encoded bytes. If not provided, init segments + // will be sent over the data track (legacy behavior). + initSegment: z.string().optional(), // base64-encoded }); // Mirrors VideoDecoderConfig diff --git a/js/hang/src/container/codec.ts b/js/hang/src/container/codec.ts index 57a1119cc..d6196a73f 100644 --- a/js/hang/src/container/codec.ts +++ b/js/hang/src/container/codec.ts @@ -11,12 +11,14 @@ import type * as Time from "../time"; */ export function encodeTimestamp(timestamp: Time.Micro, container: Catalog.Container = DEFAULT_CONTAINER): Uint8Array { switch (container) { - case "legacy": + case "native": return encodeVarInt(timestamp); case "raw": return encodeU64(timestamp); - case "fmp4": - throw new Error("fmp4 container not yet implemented"); + case "cmaf": { + // CMAF fragments contain timestamps in moof atoms, no header needed + return new Uint8Array(0); + } } } @@ -32,7 +34,7 @@ export function decodeTimestamp( container: Catalog.Container = DEFAULT_CONTAINER, ): [Time.Micro, Uint8Array] { switch (container) { - case "legacy": { + case "native": { const [value, remaining] = decodeVarInt(buffer); return [value as Time.Micro, remaining]; } @@ -40,8 +42,9 @@ export function decodeTimestamp( const [value, remaining] = decodeU64(buffer); return [value as Time.Micro, remaining]; } - case "fmp4": - throw new Error("fmp4 container not yet implemented"); + case "cmaf": { + return [0 as Time.Micro, buffer]; + } } } @@ -54,12 +57,12 @@ export function decodeTimestamp( */ export function getTimestampSize(container: Catalog.Container = DEFAULT_CONTAINER): number { switch (container) { - case "legacy": + case "native": return 8; // VarInt maximum size case "raw": return 8; // u64 fixed size - case "fmp4": - throw new Error("fmp4 container not yet implemented"); + case "cmaf": + return 8; // VarInt maximum size (same as native) } } diff --git a/js/hang/src/frame.ts b/js/hang/src/frame.ts index a8209fb37..fa7d2b4da 100644 --- a/js/hang/src/frame.ts +++ b/js/hang/src/frame.ts @@ -20,6 +20,16 @@ export function encode(source: Uint8Array | Source, timestamp: Time.Micro, conta // Encode timestamp using the specified container format const timestampBytes = Container.encodeTimestamp(timestamp, container); + // For CMAF, timestampBytes will be empty, so we just return the source + if (container === "cmaf") { + if (source instanceof Uint8Array) { + return source; + } + const data = new Uint8Array(source.byteLength); + source.copyTo(data); + return data; + } + // Allocate buffer for timestamp + payload const payloadSize = source instanceof Uint8Array ? source.byteLength : source.byteLength; const data = new Uint8Array(timestampBytes.byteLength + payloadSize); @@ -112,19 +122,18 @@ export class Consumer { async #run() { // Start fetching groups in the background + for (;;) { const consumer = await this.#track.nextGroup(); - if (!consumer) break; + if (!consumer) { + break; + } - // To improve TTV, we always start with the first group. - // For higher latencies we might need to figure something else out, as its racey. if (this.#active === undefined) { this.#active = consumer.sequence; } if (consumer.sequence < this.#active) { - console.warn(`skipping old group: ${consumer.sequence} < ${this.#active}`); - // Skip old groups. consumer.close(); continue; } @@ -150,7 +159,9 @@ export class Consumer { for (;;) { const next = await group.consumer.readFrame(); - if (!next) break; + if (!next) { + break; + } const { data, timestamp } = decode(next, this.#container); const frame = { @@ -223,8 +234,6 @@ export class Consumer { if (this.#active !== undefined && first.consumer.sequence <= this.#active) { this.#groups.shift(); - console.warn(`skipping slow group: ${first.consumer.sequence} < ${this.#groups[0]?.consumer.sequence}`); - first.consumer.close(); first.frames.length = 0; } @@ -246,7 +255,9 @@ export class Consumer { this.#groups[0].consumer.sequence <= this.#active ) { const frame = this.#groups[0].frames.shift(); - if (frame) return frame; + if (frame) { + return frame; + } // Check if the group is done and then remove it. if (this.#active > this.#groups[0].consumer.sequence) { @@ -261,7 +272,9 @@ export class Consumer { const wait = new Promise((resolve) => { this.#notify = resolve; - }).then(() => true); + }).then(() => { + return true; + }); if (!(await Promise.race([wait, this.#signals.closed]))) { this.#notify = undefined; diff --git a/js/hang/src/util/mime.ts b/js/hang/src/util/mime.ts new file mode 100644 index 000000000..bd7b32907 --- /dev/null +++ b/js/hang/src/util/mime.ts @@ -0,0 +1,52 @@ +import type * as Catalog from "../catalog"; + +/** + * Builds a MIME type string for MediaSource from a codec string. + * + * @param codec - The codec string from the catalog (e.g., "avc1.42E01E", "mp4a.40.2") + * @param type - "video" or "audio" + * @returns MIME type string (e.g., "video/mp4; codecs=\"avc1.42E01E\"") + */ +export function buildMimeType(codec: string, type: "video" | "audio"): string { + // For MP4 containers, we use the standard MIME type format + // Most codecs are already in the correct format for MSE + return `${type}/mp4; codecs="${codec}"`; +} + +/** + * Checks if a MIME type is supported by MediaSource. + * + * @param mimeType - The MIME type to check + * @returns true if supported, false otherwise + */ +export function isMimeTypeSupported(mimeType: string): boolean { + return MediaSource.isTypeSupported(mimeType); +} + +/** + * Builds and validates a MIME type for video from catalog config. + * + * @param config - Video configuration from catalog + * @returns MIME type string or undefined if not supported + */ +export function buildVideoMimeType(config: Catalog.VideoConfig): string | undefined { + const mimeType = buildMimeType(config.codec, "video"); + if (isMimeTypeSupported(mimeType)) { + return mimeType; + } + return undefined; +} + +/** + * Builds and validates a MIME type for audio from catalog config. + * + * @param config - Audio configuration from catalog + * @returns MIME type string or undefined if not supported + */ +export function buildAudioMimeType(config: Catalog.AudioConfig): string | undefined { + const mimeType = buildMimeType(config.codec, "audio"); + if (isMimeTypeSupported(mimeType)) { + return mimeType; + } + return undefined; +} diff --git a/js/hang/src/watch/audio/emitter.ts b/js/hang/src/watch/audio/emitter.ts index 79be52da9..2bdbb9870 100644 --- a/js/hang/src/watch/audio/emitter.ts +++ b/js/hang/src/watch/audio/emitter.ts @@ -46,7 +46,8 @@ export class Emitter { }); this.#signals.effect((effect) => { - const enabled = !effect.get(this.paused) && !effect.get(this.muted); + const paused = effect.get(this.paused); + const enabled = !paused; this.source.enabled.set(enabled); }); @@ -56,7 +57,44 @@ export class Emitter { this.muted.set(volume === 0); }); + // Handle MSE path (HTMLAudioElement) vs WebCodecs path (AudioWorklet) this.#signals.effect((effect) => { + const mseAudio = effect.get(this.source.mseAudioElement); + if (mseAudio) { + // MSE path: control HTMLAudioElement directly + effect.effect(() => { + const volume = effect.get(this.volume); + const muted = effect.get(this.muted); + const paused = effect.get(this.paused); + mseAudio.volume = volume; + mseAudio.muted = muted; + + // Control play/pause state + if (paused && !mseAudio.paused) { + mseAudio.pause(); + } else if (!paused && mseAudio.paused) { + // Resume if paused - try to play even if readyState is low + const tryPlay = () => { + if (!paused && mseAudio.paused) { + mseAudio + .play() + .catch((err) => console.error("[Audio Emitter] Failed to resume audio:", err)); + } + }; + + // Try to play if we have metadata (HAVE_METADATA = 1), browser will start when ready + if (mseAudio.readyState >= HTMLMediaElement.HAVE_METADATA) { + tryPlay(); + } else { + // Wait for loadedmetadata event if not ready yet + mseAudio.addEventListener("loadedmetadata", tryPlay, { once: true }); + } + } + }); + return; + } + + // WebCodecs path: use AudioWorklet with GainNode const root = effect.get(this.source.root); if (!root) return; @@ -76,9 +114,10 @@ export class Emitter { }); }); + // Only apply gain transitions for WebCodecs path (when gain node exists) this.#signals.effect((effect) => { const gain = effect.get(this.#gain); - if (!gain) return; + if (!gain) return; // MSE path doesn't use gain node // Cancel any scheduled transitions on change. effect.cleanup(() => gain.gain.cancelScheduledValues(gain.context.currentTime)); diff --git a/js/hang/src/watch/audio/source.ts b/js/hang/src/watch/audio/source.ts index ca542b780..f482e675c 100644 --- a/js/hang/src/watch/audio/source.ts +++ b/js/hang/src/watch/audio/source.ts @@ -5,6 +5,8 @@ import * as Frame from "../../frame"; import type * as Time from "../../time"; import * as Hex from "../../util/hex"; import * as libav from "../../util/libav"; +import type { SourceMSE } from "../source-mse"; +import type * as Video from "../video"; import type * as Render from "./render"; // We want some extra overhead to avoid starving the render worklet. @@ -41,6 +43,10 @@ export class Source { // Downcast to AudioNode so it matches Publish.Audio readonly root = this.#worklet as Getter; + // For MSE path, expose the HTMLAudioElement for direct control + #mseAudioElement = new Signal(undefined); + readonly mseAudioElement = this.#mseAudioElement as Getter; + #sampleRate = new Signal(undefined); readonly sampleRate: Getter = this.#sampleRate; @@ -58,6 +64,9 @@ export class Source { #signals = new Effect(); + // Reference to video source for coordination + video?: Video.Source; + constructor( broadcast: Getter, catalog: Getter, @@ -95,6 +104,12 @@ export class Source { const config = effect.get(this.config); if (!config) return; + // Don't create worklet for MSE (cmaf) - browser handles playback directly + // The worklet is only needed for WebCodecs path + if (config.container === "cmaf") { + return; + } + const sampleRate = config.sampleRate; const channelCount = config.numberOfChannels; @@ -149,26 +164,279 @@ export class Source { #runDecoder(effect: Effect): void { const enabled = effect.get(this.enabled); - if (!enabled) return; + const config = effect.get(this.config); + + // For CMAF, we need to add the SourceBuffer even if audio is disabled + // This ensures the MediaSource has both SourceBuffers before video starts appending + // We'll just not append audio data if disabled + if (config?.container === "cmaf") { + // Always initialize MSE for CMAF, even if disabled + // The SourceBuffer needs to be added before video starts appending + } else if (!enabled) { + // For non-CMAF, if disabled, don't initialize + return; + } + + if (!enabled && config?.container !== "cmaf") { + return; + } const catalog = effect.get(this.catalog); - if (!catalog) return; + if (!catalog) { + return; + } const broadcast = effect.get(this.broadcast); - if (!broadcast) return; + if (!broadcast) { + return; + } - const config = effect.get(this.config); - if (!config) return; + if (!config) { + return; + } const active = effect.get(this.active); - if (!active) return; + if (!active) { + return; + } - const sub = broadcast.subscribe(active, catalog.priority); + // Route to MSE for CMAF, WebCodecs for native/raw + // For CMAF, ALWAYS initialize MSE (even if disabled) to add SourceBuffer + // This ensures MediaSource has both SourceBuffers before video starts appending + // The SourceBuffer will be added, but fragments won't be appended if disabled + console.log(`[Audio Source] Routing audio: container=${config.container}, enabled=${enabled}`); + if (config.container === "cmaf") { + // Always initialize for CMAF - SourceBuffer must be added before video starts + console.log("[Audio Source] Using MSE path for CMAF"); + this.#runMSEPath(effect, broadcast, active, config, catalog); + } else { + // For non-CMAF, only run if enabled + console.log(`[Audio Source] Using WebCodecs path (container=${config.container})`); + if (enabled) { + this.#runWebCodecsPath(effect, broadcast, active, config, catalog); + } + } + } + + #runMSEPath( + effect: Effect, + broadcast: Moq.Broadcast, + name: string, + config: Catalog.AudioConfig, + catalog: Catalog.Audio, + ): void { + // Use the unified SourceMSE from video - it manages both video and audio SourceBuffers + // Use a reactive effect to always get the latest SourceMSE instance + effect.spawn(async () => { + // Wait for video's MSE source to be available + // Video creates it asynchronously, and may recreate it when restarting + // So we need to get it reactively each time + let videoMseSource: SourceMSE | undefined; + if (this.video?.mseSource) { + // Wait up to 2 seconds for video MSE source to be available + const maxWait = 2000; + const startTime = Date.now(); + while (!videoMseSource && Date.now() - startTime < maxWait) { + videoMseSource = effect.get(this.video.mseSource); + if (!videoMseSource) { + await new Promise((resolve) => setTimeout(resolve, 50)); // Check more frequently + } + } + } + + if (!videoMseSource) { + console.error("[Audio Source] Video MSE source not available, falling back to WebCodecs"); + this.#runWebCodecsPath(effect, broadcast, name, config, catalog); + return; + } + + // For MSE path, audio plays through the video element + // Expose video element as "audioElement" for compatibility with emitter + // Use reactive effect to always get the latest video element + this.#signals.effect((eff) => { + // Get latest SourceMSE instance in case video restarted + const latestMseSource = this.video?.mseSource ? eff.get(this.video.mseSource) : undefined; + const mseSource = latestMseSource || videoMseSource; + const videoElement = mseSource?.videoElement ? eff.get(mseSource.videoElement) : undefined; + // Expose as audioElement for emitter compatibility (HTMLVideoElement works the same as HTMLAudioElement for volume/mute) + eff.set(this.#mseAudioElement, videoElement as HTMLAudioElement | undefined); + }); + + // Forward stats (audio stats are not currently tracked in unified SourceMSE, but we can add them later) + // For now, just set empty stats + this.#signals.effect((eff) => { + eff.set(this.#stats, { bytesReceived: 0 }); + }); + + // Check if audio is enabled + const isEnabled = effect.get(this.enabled); + + // Only subscribe to track and initialize SourceBuffer if enabled + // When disabled, we don't need to do anything - video can play without audio + if (!isEnabled) { + console.log( + `[Audio Source] Audio disabled, skipping SourceBuffer initialization and track subscription - video will play without audio`, + ); + return; + } + + // Audio is enabled - subscribe to track and initialize SourceBuffer + // Wait a bit for video to stabilize if it's restarting + // Get the latest SourceMSE instance and verify it's stable + let latestMseSource: SourceMSE | undefined; + let retryCount = 0; + const maxRetries = 3; + + while (retryCount < maxRetries) { + // Get the latest SourceMSE instance (in case video restarted) + latestMseSource = this.video?.mseSource ? effect.get(this.video.mseSource) : videoMseSource; + if (!latestMseSource) { + // Wait a bit for video to create SourceMSE + await new Promise((resolve) => setTimeout(resolve, 100)); + retryCount++; + continue; + } + + // Check if MediaSource is ready (not closed) + const mediaSource = latestMseSource.mediaSource ? effect.get(latestMseSource.mediaSource) : undefined; + if ( + mediaSource && + typeof mediaSource === "object" && + "readyState" in mediaSource && + (mediaSource as MediaSource).readyState === "closed" + ) { + // MediaSource is closed, video might be restarting - wait and retry + console.log("[Audio Source] MediaSource is closed, waiting for video to stabilize"); + await new Promise((resolve) => setTimeout(resolve, 200)); + retryCount++; + continue; + } + + // SourceMSE instance looks good, proceed + break; + } + + if (!latestMseSource) { + console.warn("[Audio Source] SourceMSE instance not available after retries, skipping audio"); + return; + } + + console.log("[Audio Stream] Subscribing to track", { + name, + codec: config.codec, + container: config.container, + sampleRate: config.sampleRate, + channels: config.numberOfChannels, + }); + + // Retry a few times for transient MSE states / QuotaExceeded + for (let attempt = 0; attempt < 5; attempt++) { + try { + // Resolve freshest SourceMSE and wait for MediaSource to be open (up to ~5s). + const resolveOpenMediaSource = async (): Promise => { + const start = Date.now(); + let current = latestMseSource; + for (;;) { + // Follow any video restart by re-reading the signal + const candidate = this.video?.mseSource ? effect.get(this.video.mseSource) : current; + if (candidate && candidate !== current) { + console.log("[Audio Source] Video restarted, using new SourceMSE instance"); + current = candidate; + } + + if (!current) { + if (Date.now() - start > 5000) { + throw new Error("SourceMSE not available"); + } + await new Promise((resolve) => setTimeout(resolve, 50)); + continue; + } + + const ms = current.mediaSource ? effect.get(current.mediaSource) : undefined; + if ( + ms && + typeof ms === "object" && + "readyState" in ms && + (ms as MediaSource).readyState === "open" + ) { + return current; + } + + if (Date.now() - start > 5000) { + throw new Error("MediaSource not ready for audio SourceBuffer"); + } + await new Promise((resolve) => setTimeout(resolve, 50)); + } + }; + + const readyMseSource = await resolveOpenMediaSource(); + latestMseSource = readyMseSource; + + console.log( + `[Audio Source] Initializing audio SourceBuffer on unified SourceMSE (attempt ${attempt + 1})`, + ); + await latestMseSource.initializeAudio(config); + + // Verify we're still using the current instance after initialization + const verifyMseSource = this.video?.mseSource ? effect.get(this.video.mseSource) : latestMseSource; + if (verifyMseSource && verifyMseSource !== latestMseSource) { + // Video restarted during initialization, get new instance and retry + console.log("[Audio Source] Video restarted during initialization, retrying with new instance"); + await verifyMseSource.initializeAudio(config); + latestMseSource = verifyMseSource; + } + + console.log(`[Audio Source] Audio SourceBuffer initialization completed`); + + // Get latest instance again before running track (video might have restarted) + const finalMseSource = this.video?.mseSource ? effect.get(this.video.mseSource) : latestMseSource; + if (!finalMseSource) { + throw new Error("SourceMSE instance not available"); + } + + // Run audio track - use the latest instance + console.log(`[Audio Source] Starting MSE track on unified SourceMSE`); + await finalMseSource.runAudioTrack(effect, broadcast, name, config, catalog, this.enabled); + console.log("[Audio Source] MSE track completed successfully"); + return; // success + } catch (error) { + const retriable = error instanceof DOMException && error.name === "QuotaExceededError"; + if (!retriable || attempt === 4) { + console.warn( + "[Audio Source] Failed to initialize audio SourceBuffer, video will continue without audio:", + error, + ); + return; + } + const delay = 150 + attempt * 150; + console.warn( + `[Audio Source] Audio init attempt ${attempt + 1} failed (${(error as Error).message}); retrying in ${delay}ms`, + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + }); + } + + #runWebCodecsPath( + effect: Effect, + broadcast: Moq.Broadcast, + name: string, + config: Catalog.AudioConfig, + catalog: Catalog.Audio, + ): void { + console.log("[Audio Stream] Subscribing to track", { + name, + codec: config.codec, + container: config.container, + sampleRate: config.sampleRate, + channels: config.numberOfChannels, + }); + const sub = broadcast.subscribe(name, catalog.priority); effect.cleanup(() => sub.close()); // Create consumer with slightly less latency than the render worklet to avoid underflowing. - // Container defaults to "legacy" via Zod schema for backward compatibility - console.log(`[Audio Subscriber] Using container format: ${config.container}`); + // Container defaults to "native" via Zod schema for backward compatibility const consumer = new Frame.Consumer(sub, { latency: Math.max(this.latency.peek() - JITTER_UNDERHEAD, 0) as Time.Milli, container: config.container, diff --git a/js/hang/src/watch/broadcast.ts b/js/hang/src/watch/broadcast.ts index c8fe30a1a..5d28a94b6 100644 --- a/js/hang/src/watch/broadcast.ts +++ b/js/hang/src/watch/broadcast.ts @@ -62,8 +62,14 @@ export class Broadcast { this.path = Signal.from(props?.path); this.enabled = Signal.from(props?.enabled ?? false); this.reload = Signal.from(props?.reload ?? true); - this.audio = new Audio.Source(this.#broadcast, this.#catalog, props?.audio); + + // Create video first so audio can use its MediaSource this.video = new Video.Source(this.#broadcast, this.#catalog, props?.video); + + // Create audio and pass video reference for coordination + this.audio = new Audio.Source(this.#broadcast, this.#catalog, props?.audio); + this.audio.video = this.video; // Pass video reference for coordination + this.location = new Location.Root(this.#broadcast, this.#catalog, props?.location); this.chat = new Chat(this.#broadcast, this.#catalog, props?.chat); this.preview = new Preview(this.#broadcast, this.#catalog, props?.preview); diff --git a/js/hang/src/watch/source-mse.ts b/js/hang/src/watch/source-mse.ts new file mode 100644 index 000000000..0e19b2b82 --- /dev/null +++ b/js/hang/src/watch/source-mse.ts @@ -0,0 +1,1546 @@ +import type * as Moq from "@moq/lite"; +import { Effect, type Getter, Signal } from "@moq/signals"; +import type * as Catalog from "../catalog"; +import * as Frame from "../frame"; +import { PRIORITY } from "../publish/priority"; +import type * as Time from "../time"; +import * as Mime from "../util/mime"; + +// The types in VideoDecoderConfig that cause a hard reload. +type RequiredDecoderConfig = Omit & + Partial>; + +type BufferStatus = { state: "empty" | "filled" }; + +type SyncStatus = { + state: "ready" | "wait"; + bufferDuration?: number; +}; + +export interface VideoStats { + frameCount: number; + timestamp: number; + bytesReceived: number; +} + +/** + * MSE-based video source for CMAF/fMP4 fragments. + * Uses Media Source Extensions to handle complete moof+mdat fragments. + */ +export class SourceMSE { + #video?: HTMLVideoElement; + #mediaSource?: MediaSource; + #videoSourceBuffer?: SourceBuffer; + #audioSourceBuffer?: SourceBuffer; + #audioSourceBufferSetup = false; // Track if audio SourceBuffer has been set up + + readonly mediaSource = new Signal(undefined); + + // Expose video element for audio control (audio plays through video element) + readonly videoElement = new Signal(undefined); + + // Queue of fragments waiting to be added for video + #videoAppendQueue: Uint8Array[] = []; + // Queue of fragments waiting to be added for audio + #audioAppendQueue: Uint8Array[] = []; + static readonly MAX_QUEUE_SIZE = 10; // Maximum fragments in queue + + // Expose the current frame to render as a signal + frame = new Signal(undefined); + + // The target latency in milliseconds. + latency: Signal; + + // The display size of the video in pixels. + display = new Signal<{ width: number; height: number } | undefined>(undefined); + + // Whether to flip the video horizontally. + flip = new Signal(undefined); + + bufferStatus = new Signal({ state: "empty" }); + syncStatus = new Signal({ state: "ready" }); + + #stats = new Signal(undefined); + + #signals = new Effect(); + #frameCallbackId?: number; + + constructor(latency: Signal) { + this.latency = latency; + } + + /** + * Check if any SourceBuffer is updating + */ + #isBufferUpdating(): boolean { + if (!this.#mediaSource) return false; + const buffers = this.#mediaSource.sourceBuffers; + for (let i = 0; i < buffers.length; i++) { + if (buffers[i].updating) { + return true; + } + } + return false; + } + + async initializeVideo(config: RequiredDecoderConfig): Promise { + const mimeType = Mime.buildVideoMimeType(config); + if (!mimeType) { + throw new Error(`Unsupported codec for MSE: ${config.codec}`); + } + + console.log("[MSE] Initializing video, MIME type:", mimeType); + + // Create video element + this.#video = document.createElement("video"); + this.#video.style.display = "none"; + this.#video.playsInline = true; + this.#video.muted = false; // Don't mute - audio plays through video element + document.body.appendChild(this.#video); + + // Expose video element + this.videoElement.set(this.#video); + + // Create MediaSource + this.#mediaSource = new MediaSource(); + this.mediaSource.set(this.#mediaSource); + console.log("[MSE] Video initialization: MediaSource signal set, state:", this.#mediaSource.readyState); + + // Attach MediaSource to video element + const url = URL.createObjectURL(this.#mediaSource); + this.#video.src = url; + console.log("[MSE] MediaSource created and attached to video element"); + + // Wait for sourceopen event + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error("MediaSource sourceopen timeout")); + }, 5000); + + this.#mediaSource?.addEventListener( + "sourceopen", + () => { + clearTimeout(timeout); + console.log("[MSE] MediaSource sourceopen event fired"); + // Update signal to ensure audio sees the open MediaSource + if (this.#mediaSource) { + this.mediaSource.set(this.#mediaSource); + } + try { + this.#videoSourceBuffer = this.#mediaSource?.addSourceBuffer(mimeType); + if (!this.#videoSourceBuffer) { + reject(new Error("Failed to create video SourceBuffer")); + return; + } + console.log("[MSE] Video SourceBuffer created successfully"); + this.#setupVideoSourceBuffer(); + resolve(); + } catch (error) { + console.error("[MSE] Error creating video SourceBuffer:", error); + reject(error); + } + }, + { once: true }, + ); + + this.#mediaSource?.addEventListener( + "error", + (e) => { + clearTimeout(timeout); + console.error("[MSE] MediaSource error event:", e); + reject(new Error(`MediaSource error: ${e}`)); + }, + { once: true }, + ); + }); + + console.log("[MSE] Video initialization complete, starting frame capture"); + this.#startFrameCapture(); + } + + async initializeAudio(config: Catalog.AudioConfig): Promise { + // Early return if already initialized + if (this.#audioSourceBuffer && this.#audioSourceBufferSetup) { + console.log("[MSE] Audio SourceBuffer already initialized, skipping"); + return; + } + + const mimeType = Mime.buildAudioMimeType(config); + if (!mimeType) { + throw new Error(`Unsupported codec for MSE: ${config.codec}`); + } + + console.log("[MSE] Initializing audio, MIME type:", mimeType); + + // Get MediaSource from signal (most up-to-date) + // Use a small delay to ensure signal updates have propagated + await new Promise((resolve) => setTimeout(resolve, 10)); + let mediaSource = this.mediaSource.peek(); + console.log( + "[MSE] Audio initialization: MediaSource from signal:", + mediaSource ? `readyState=${mediaSource.readyState}` : "not set", + ); + + // Also check private field as fallback + if (!mediaSource && this.#mediaSource) { + console.log( + "[MSE] Audio initialization: Using private MediaSource field, state:", + this.#mediaSource.readyState, + ); + mediaSource = this.#mediaSource; + } + + // Quick check: if MediaSource is ready, proceed immediately + if (mediaSource && mediaSource.readyState === "open") { + console.log("[MSE] Audio initialization: MediaSource is already open, proceeding"); + this.#mediaSource = mediaSource; + } else { + console.log("[MSE] Audio initialization: MediaSource not ready, waiting..."); + // Wait for MediaSource to be created and open (video initialization is async) + // Use a longer timeout to allow video to restart properly + await new Promise((resolve, reject) => { + const maxWait = 5000; // 5 seconds max wait + const startTime = Date.now(); + const checkInterval = 50; // Check every 50ms for responsiveness + + const timeout = setTimeout(() => { + const waited = ((Date.now() - startTime) / 1000).toFixed(1); + reject( + new Error( + `MediaSource not ready after ${waited}s (current state: ${mediaSource?.readyState || "not created"})`, + ), + ); + }, maxWait); + + const checkReady = () => { + // Get latest MediaSource from signal (always get fresh value) + const signalValue = this.mediaSource.peek(); + mediaSource = signalValue; + + // Also check private field if signal is not set + if (!mediaSource && this.#mediaSource) { + mediaSource = this.#mediaSource; + } + + // Check if MediaSource exists and is open + if (mediaSource && mediaSource.readyState === "open") { + clearTimeout(timeout); + this.#mediaSource = mediaSource; + const elapsed = ((Date.now() - startTime) / 1000).toFixed(2); + console.log(`[MSE] Audio initialization: MediaSource is ready (waited ${elapsed}s)`); + resolve(); + return; + } + + // Log progress for debugging (every 0.5 seconds) + const elapsed = Date.now() - startTime; + if (elapsed % 500 < checkInterval) { + const signalState = this.mediaSource.peek()?.readyState || "not set"; + const privateState = this.#mediaSource?.readyState || "not set"; + console.log( + `[MSE] Audio initialization: Waiting for MediaSource (${(elapsed / 1000).toFixed(1)}s, signal: ${signalState}, private: ${privateState})`, + ); + } + + // If MediaSource exists but is closed, it's from an old instance - wait for new one + if (mediaSource && mediaSource.readyState === "closed") { + // Reset private field + if (this.#mediaSource === mediaSource) { + this.#mediaSource = undefined; + } + } + + // Continue checking if we haven't exceeded max wait time + if (elapsed < maxWait) { + setTimeout(checkReady, checkInterval); + } else { + clearTimeout(timeout); + const waited = (elapsed / 1000).toFixed(1); + const finalSignalState = this.mediaSource.peek()?.readyState || "not set"; + const finalPrivateState = this.#mediaSource?.readyState || "not set"; + reject( + new Error( + `MediaSource not ready after ${waited}s (signal: ${finalSignalState}, private: ${finalPrivateState})`, + ), + ); + } + }; + + checkReady(); + }); + } + + // Final check - ensure we have a MediaSource + mediaSource = this.mediaSource.peek() || this.#mediaSource; + if (!mediaSource || mediaSource.readyState !== "open") { + throw new Error(`MediaSource not ready (state: ${mediaSource?.readyState || "not created"})`); + } + + // Update private field + this.#mediaSource = mediaSource; + + // Check if MediaSource already has an audio SourceBuffer + // (could be added by a previous call to initializeAudio) + if (this.#mediaSource.sourceBuffers.length >= 2) { + const sourceBuffers = Array.from(this.#mediaSource.sourceBuffers); + + // If we already have an audio SourceBuffer set, use it + if (this.#audioSourceBuffer && sourceBuffers.includes(this.#audioSourceBuffer)) { + return; // Already have it + } + + // If we have exactly 2 SourceBuffers and one is video, the other must be audio + if (sourceBuffers.length === 2 && this.#videoSourceBuffer) { + const otherBuffer = sourceBuffers.find((sb) => sb !== this.#videoSourceBuffer); + if (otherBuffer) { + // This must be the audio SourceBuffer + this.#audioSourceBuffer = otherBuffer; + if (!this.#audioSourceBufferSetup) { + this.#setupAudioSourceBuffer(); + } + return; + } + } + + // Fallback: If we have 2 SourceBuffers but don't know which is video + // Assume the second one is audio (video is usually added first) + if (sourceBuffers.length === 2 && !this.#videoSourceBuffer) { + console.log( + "[MSE] Video SourceBuffer not set yet, using fallback: assuming second SourceBuffer is audio", + ); + this.#audioSourceBuffer = sourceBuffers[1]; + if (!this.#audioSourceBufferSetup) { + this.#setupAudioSourceBuffer(); + } + return; + } + + // MediaSource has 2 SourceBuffers but we can't identify which is audio + // This shouldn't happen, but handle gracefully + throw new Error("MediaSource already has maximum SourceBuffers and cannot identify audio SourceBuffer"); + } + + // Double-check audio SourceBuffer wasn't set while we were waiting + if (this.#audioSourceBuffer) { + return; + } + + // Wait for video SourceBuffer to finish if updating + if (this.#videoSourceBuffer?.updating) { + console.log("[MSE] Waiting for video SourceBuffer to finish updating before adding audio"); + await new Promise((resolve) => { + if (!this.#videoSourceBuffer) { + resolve(); + return; + } + this.#videoSourceBuffer.addEventListener( + "updateend", + () => { + console.log("[MSE] Video SourceBuffer finished updating"); + resolve(); + }, + { once: true }, + ); + }); + } + + // Final check before adding + if (this.#audioSourceBuffer) { + return; + } + + // Check again if MediaSource now has 2 SourceBuffers (race condition) + if (this.#mediaSource.sourceBuffers.length >= 2) { + const sourceBuffers = Array.from(this.#mediaSource.sourceBuffers); + + // If we already have audio SourceBuffer set, use it + if (this.#audioSourceBuffer && sourceBuffers.includes(this.#audioSourceBuffer)) { + return; + } + + // If we have exactly 2 and one is video, use the other + if (sourceBuffers.length === 2 && this.#videoSourceBuffer) { + const otherBuffer = sourceBuffers.find((sb) => sb !== this.#videoSourceBuffer); + if (otherBuffer) { + this.#audioSourceBuffer = otherBuffer; + if (!this.#audioSourceBufferSetup) { + this.#setupAudioSourceBuffer(); + } + return; + } + } + + // Fallback: If we have 2 SourceBuffers but don't know which is video + if (sourceBuffers.length === 2 && !this.#videoSourceBuffer) { + console.log("[MSE] Race condition: Video SourceBuffer not set yet, using fallback"); + this.#audioSourceBuffer = sourceBuffers[1]; // Assume second is audio + if (!this.#audioSourceBufferSetup) { + this.#setupAudioSourceBuffer(); + } + return; + } + + throw new Error("MediaSource already has maximum SourceBuffers and cannot identify audio SourceBuffer"); + } + + // Final check before adding - verify MediaSource is still open + if (this.#mediaSource.readyState !== "open") { + throw new Error( + `MediaSource readyState changed to "${this.#mediaSource.readyState}" before adding audio SourceBuffer`, + ); + } + + // Ensure we're using the MediaSource from signal (most up-to-date) + mediaSource = this.mediaSource.peek() || this.#mediaSource; + if (!mediaSource) { + throw new Error("MediaSource is not available"); + } + + // Update private field to match signal + this.#mediaSource = mediaSource; + + // Wait for video SourceBuffer to finish updating before adding audio SourceBuffer + // Only wait if it's actually updating (should be rare) + if (this.#videoSourceBuffer?.updating) { + console.log("[MSE] Video SourceBuffer is updating, waiting briefly"); + await new Promise((resolve) => { + const timeout = setTimeout(() => { + // Don't wait too long - proceed anyway + console.log("[MSE] Video SourceBuffer update timeout, proceeding"); + resolve(); + }, 500); // Only wait 500ms max + + if (!this.#videoSourceBuffer) { + clearTimeout(timeout); + resolve(); + return; + } + this.#videoSourceBuffer.addEventListener( + "updateend", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true }, + ); + }); + } + + // Log state before adding + const sourceBuffers = Array.from(mediaSource.sourceBuffers); + console.log("[MSE] About to add audio SourceBuffer", { + audioMimeType: mimeType, + sourceBufferCount: sourceBuffers.length, + videoSourceBufferUpdating: this.#videoSourceBuffer?.updating, + readyState: mediaSource.readyState, + isAudioMimeTypeSupported: MediaSource.isTypeSupported(mimeType), + }); + + // Double-check MIME type is supported + if (!MediaSource.isTypeSupported(mimeType)) { + throw new Error(`Audio MIME type not supported: ${mimeType}`); + } + + // Some browsers have quirks - try to add SourceBuffer and handle errors gracefully + try { + // Check if we can actually add another SourceBuffer + // Some browsers might report 1 SourceBuffer but actually be at limit + if (sourceBuffers.length >= 2) { + console.warn("[MSE] MediaSource already has 2 SourceBuffers, cannot add audio"); + throw new Error("MediaSource already has maximum SourceBuffers"); + } + + this.#audioSourceBuffer = mediaSource.addSourceBuffer(mimeType); + if (!this.#audioSourceBuffer) { + throw new Error("Failed to create audio SourceBuffer"); + } + console.log("[MSE] Audio SourceBuffer created successfully"); + this.#setupAudioSourceBuffer(); + } catch (error) { + // If QuotaExceededError, check if another call added the audio SourceBuffer + if (error instanceof DOMException && error.name === "QuotaExceededError") { + const sourceBuffers = Array.from(mediaSource.sourceBuffers); + const readyState = mediaSource.readyState; + console.log("[MSE] QuotaExceededError - MediaSource has", sourceBuffers.length, "SourceBuffers", { + readyState, + videoSourceBufferSet: !!this.#videoSourceBuffer, + audioSourceBufferSet: !!this.#audioSourceBuffer, + }); + + // If MediaSource is not open, that's the problem + if (readyState !== "open") { + throw new Error(`MediaSource readyState is "${readyState}", cannot add SourceBuffers`); + } + + // If we already have audio SourceBuffer set, use it + if (this.#audioSourceBuffer && sourceBuffers.includes(this.#audioSourceBuffer)) { + console.log("[MSE] Found existing audio SourceBuffer reference"); + return; // Success - silently return + } + + // If we have exactly 2 SourceBuffers and one is video, the other must be audio + if (sourceBuffers.length === 2 && this.#videoSourceBuffer) { + const otherBuffer = sourceBuffers.find((sb) => sb !== this.#videoSourceBuffer); + if (otherBuffer) { + console.log("[MSE] Found audio SourceBuffer by exclusion (other than video)"); + this.#audioSourceBuffer = otherBuffer; + if (!this.#audioSourceBufferSetup) { + this.#setupAudioSourceBuffer(); + } + return; // Success - silently return + } + } + + // If we have 2 SourceBuffers but don't know which is video, try to identify by checking if one is already set + // This handles the case where video SourceBuffer isn't set yet + if (sourceBuffers.length === 2) { + // If we don't have video SourceBuffer set, we can't reliably identify which is audio + // But if one of them was added by a previous call to initializeAudio, we should use it + // For now, if we have 2 SourceBuffers and can't identify, assume the first non-video one is audio + // This is a fallback - ideally video should initialize first + const nonVideoBuffer = this.#videoSourceBuffer + ? sourceBuffers.find((sb) => sb !== this.#videoSourceBuffer) + : sourceBuffers[1]; // If video not set, assume second one is audio (video is usually first) + + if (nonVideoBuffer) { + console.log("[MSE] Using fallback: assuming non-video SourceBuffer is audio"); + this.#audioSourceBuffer = nonVideoBuffer; + if (!this.#audioSourceBufferSetup) { + this.#setupAudioSourceBuffer(); + } + return; // Success - silently return + } + } + + // If we have only 1 SourceBuffer and get QuotaExceededError, this is unusual + // It might mean the video SourceBuffer is updating or MediaSource is in a transitional state + // Wait briefly and retry once + if (sourceBuffers.length === 1 && this.#videoSourceBuffer) { + console.log("[MSE] QuotaExceededError with only 1 SourceBuffer - retrying once", { + readyState: mediaSource.readyState, + videoSourceBufferUpdating: this.#videoSourceBuffer.updating, + }); + + // Wait for video SourceBuffer to finish if it's updating (with timeout) + if (this.#videoSourceBuffer.updating) { + await new Promise((resolve) => { + const timeout = setTimeout(() => resolve(), 200); // Max 200ms wait + if (!this.#videoSourceBuffer) { + clearTimeout(timeout); + resolve(); + return; + } + this.#videoSourceBuffer.addEventListener( + "updateend", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true }, + ); + }); + } else { + // Brief wait for MediaSource to stabilize + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + // Quick retry - check if another call added it first + const currentSourceBuffers = Array.from(mediaSource.sourceBuffers); + if (currentSourceBuffers.length >= 2) { + const otherBuffer = currentSourceBuffers.find((sb) => sb !== this.#videoSourceBuffer); + if (otherBuffer) { + console.log("[MSE] Found audio SourceBuffer after retry"); + this.#audioSourceBuffer = otherBuffer; + if (!this.#audioSourceBufferSetup) { + this.#setupAudioSourceBuffer(); + } + return; + } + } + + // Try adding again + try { + if (mediaSource.readyState !== "open") { + throw new Error(`MediaSource readyState is "${mediaSource.readyState}"`); + } + this.#audioSourceBuffer = mediaSource.addSourceBuffer(mimeType); + if (!this.#audioSourceBuffer) { + throw new Error("Failed to create audio SourceBuffer"); + } + console.log("[MSE] Audio SourceBuffer created successfully after retry"); + this.#setupAudioSourceBuffer(); + return; // Success + } catch (retryError) { + // If retry also fails, allow video-only playback (don't delay further) + console.warn("[MSE] Retry failed, allowing video-only playback", retryError); + return; + } + } + + // If we still can't find it, log details and rethrow + console.warn("[MSE] QuotaExceededError but couldn't find audio SourceBuffer in MediaSource", { + sourceBufferCount: sourceBuffers.length, + readyState: mediaSource.readyState, + hasVideoSourceBuffer: !!this.#videoSourceBuffer, + hasAudioSourceBuffer: !!this.#audioSourceBuffer, + }); + } + console.error("[MSE] Error adding audio SourceBuffer:", error); + throw error; + } + } + + #setupVideoSourceBuffer(): void { + if (!this.#videoSourceBuffer) return; + + const SEEK_HYSTERESIS = 0.1; // seconds to avoid re-seek loops on tiny drift + this.#videoSourceBuffer.addEventListener("updateend", () => { + // Check if we have buffered data and try to play + const video = this.#video; + const sourceBuffer = this.#videoSourceBuffer; + if (video && sourceBuffer && sourceBuffer.buffered.length > 0) { + const buffered = sourceBuffer.buffered; + const start = buffered.start(0); + const end = buffered.end(0); + + // Seek to start of buffered range if needed + if ( + video.currentTime + SEEK_HYSTERESIS < start || + video.currentTime >= end - SEEK_HYSTERESIS || + Number.isNaN(video.currentTime) + ) { + console.log(`[MSE] Seeking video to buffered range start: ${start.toFixed(2)}`); + video.currentTime = start; + } + + // Try to play if paused + if (video.paused && video.readyState >= HTMLMediaElement.HAVE_METADATA) { + console.log("[MSE] Attempting to play video after SourceBuffer updateend"); + video.play().catch((err) => { + console.warn("[MSE] Autoplay blocked:", err); + }); + } + } + + this.#processVideoQueue(); + }); + + this.#videoSourceBuffer.addEventListener("error", (e) => { + console.error("[MSE] Video SourceBuffer error:", e); + }); + } + + #setupAudioSourceBuffer(): void { + if (!this.#audioSourceBuffer || this.#audioSourceBufferSetup) return; + + this.#audioSourceBuffer.addEventListener("updateend", () => { + this.#processAudioQueue(); + }); + + this.#audioSourceBuffer.addEventListener("error", (e) => { + console.error("[MSE] Audio SourceBuffer error:", e); + }); + + this.#audioSourceBufferSetup = true; + } + + #startFrameCapture(): void { + if (!this.#video) return; + + let captureCount = 0; + const captureFrame = () => { + if (!this.#video) return; + + try { + const frame = new VideoFrame(this.#video, { + timestamp: this.#video.currentTime * 1_000_000, // Convert to microseconds + }); + + captureCount++; + if (captureCount === 1 || captureCount % 30 === 0) { + console.log( + `[MSE] Captured frame ${captureCount}, currentTime: ${this.#video.currentTime.toFixed(2)}, readyState: ${this.#video.readyState}, paused: ${this.#video.paused}, buffered: ${this.#video.buffered.length > 0 ? `${this.#video.buffered.start(0).toFixed(2)}-${this.#video.buffered.end(0).toFixed(2)}` : "none"}`, + ); + } + + this.#stats.update((current) => ({ + frameCount: (current?.frameCount ?? 0) + 1, + timestamp: frame.timestamp, + bytesReceived: current?.bytesReceived ?? 0, + })); + + this.frame.update((prev) => { + prev?.close(); + return frame; + }); + + if (this.#video.videoWidth && this.#video.videoHeight) { + this.display.set({ + width: this.#video.videoWidth, + height: this.#video.videoHeight, + }); + } + + if (this.#video.readyState >= HTMLMediaElement.HAVE_CURRENT_DATA) { + this.bufferStatus.set({ state: "filled" }); + // Try to play if paused and we have data + if (this.#video.paused && this.#video.readyState >= HTMLMediaElement.HAVE_CURRENT_DATA) { + this.#video.play().catch((err) => { + if (captureCount <= 5) { + console.log("[MSE] Attempted to play video, result:", err); + } + }); + } + } + } catch (error) { + console.error("Error capturing frame:", error); + } + + if (this.#video.requestVideoFrameCallback) { + this.#frameCallbackId = this.#video.requestVideoFrameCallback(captureFrame); + } else { + this.#frameCallbackId = requestAnimationFrame(captureFrame) as unknown as number; + } + }; + + if (this.#video.requestVideoFrameCallback) { + this.#frameCallbackId = this.#video.requestVideoFrameCallback(captureFrame); + } else { + this.#frameCallbackId = requestAnimationFrame(captureFrame) as unknown as number; + } + } + + async appendVideoFragment(fragment: Uint8Array): Promise { + if (!this.#videoSourceBuffer || !this.#mediaSource) { + throw new Error("Video SourceBuffer not initialized"); + } + + if (this.#videoAppendQueue.length >= SourceMSE.MAX_QUEUE_SIZE) { + const discarded = this.#videoAppendQueue.shift(); + console.warn( + `[MSE] Video queue full (${SourceMSE.MAX_QUEUE_SIZE}), discarding oldest fragment (${discarded?.byteLength ?? 0} bytes)`, + ); + } + + const copy = new Uint8Array(fragment); + this.#videoAppendQueue.push(copy); + this.#processVideoQueue(); + } + + async appendAudioFragment(fragment: Uint8Array): Promise { + // If audio SourceBuffer doesn't exist, silently return (video-only playback) + if (!this.#audioSourceBuffer || !this.#mediaSource) { + return; + } + + if (this.#mediaSource.readyState === "closed") { + return; + } + + if (this.#audioAppendQueue.length >= SourceMSE.MAX_QUEUE_SIZE) { + const discarded = this.#audioAppendQueue.shift(); + console.warn( + `[MSE] Audio queue full (${SourceMSE.MAX_QUEUE_SIZE}), discarding oldest fragment (${discarded?.byteLength ?? 0} bytes)`, + ); + } + + const copy = new Uint8Array(fragment); + this.#audioAppendQueue.push(copy); + this.#processAudioQueue(); + } + + /** + * Extracts a track-specific init segment from a full init segment. + * MSE requires track-specific init segments for each SourceBuffer. + */ + #extractTrackInitSegment(fullInitSegment: Uint8Array, trackType: "video" | "audio"): Uint8Array { + let offset = 0; + let ftypAtom: Uint8Array | null = null; + let moovOffset = 0; + let moovSize = 0; + + // Find ftyp and moov atoms + while (offset + 8 <= fullInitSegment.length) { + const size = + (fullInitSegment[offset] << 24) | + (fullInitSegment[offset + 1] << 16) | + (fullInitSegment[offset + 2] << 8) | + fullInitSegment[offset + 3]; + const type = String.fromCharCode( + fullInitSegment[offset + 4], + fullInitSegment[offset + 5], + fullInitSegment[offset + 6], + fullInitSegment[offset + 7], + ); + + if (type === "ftyp") { + ftypAtom = fullInitSegment.slice(offset, offset + size); + offset += size; + } else if (type === "moov") { + moovOffset = offset; + moovSize = size; + break; + } else { + if (size < 8 || size === 0) break; + offset += size; + } + } + + if (moovSize === 0) { + throw new Error("moov atom not found in init segment"); + } + + // Parse moov atom to find the relevant track + const moovAtom = fullInitSegment.slice(moovOffset, moovOffset + moovSize); + const targetHandler = trackType === "video" ? "vide" : "soun"; + + // Count tracks in moov + let moov_track_count = 0; + let moov_offset_temp = 8; + while (moov_offset_temp + 8 <= moovAtom.length) { + const size = + (moovAtom[moov_offset_temp] << 24) | + (moovAtom[moov_offset_temp + 1] << 16) | + (moovAtom[moov_offset_temp + 2] << 8) | + moovAtom[moov_offset_temp + 3]; + const type = String.fromCharCode( + moovAtom[moov_offset_temp + 4], + moovAtom[moov_offset_temp + 5], + moovAtom[moov_offset_temp + 6], + moovAtom[moov_offset_temp + 7], + ); + if (type === "trak") { + moov_track_count++; + } + if (size < 8 || size === 0) break; + moov_offset_temp += size; + } + + // If only one track, use directly + if (moov_track_count === 1) { + return fullInitSegment; + } + + // Multiple tracks - need to extract + const trakAtom = this.#findTrackInMoov(moovAtom, targetHandler); + if (!trakAtom) { + // Try alternative handler types + const alternatives = trackType === "video" ? ["vid ", "vide", "avc1"] : ["soun", "mp4a"]; + for (const alt of alternatives) { + const altTrak = this.#findTrackInMoov(moovAtom, alt); + if (altTrak) { + return this.#extractTrackInitSegmentWithHandler(fullInitSegment, ftypAtom, moovAtom, alt); + } + } + + const foundTracks = this.#getAllTracksInMoov(moovAtom); + const foundHandlers = foundTracks.map((t) => t.handler || "unknown").join(", "); + throw new Error( + `${trackType} track not found in moov atom. ` + + `Looking for handler: "${targetHandler}", but found: [${foundHandlers}]. ` + + `The init segment should contain all tracks.`, + ); + } + + // Reconstruct moov atom with only the target track + const newMoov = this.#rebuildMoovWithSingleTrack(moovAtom, trakAtom, targetHandler); + + // Combine ftyp (if present) + new moov + const result: Uint8Array[] = []; + if (ftypAtom) { + result.push(ftypAtom); + } + result.push(newMoov); + + const totalSize = result.reduce((sum, arr) => sum + arr.length, 0); + const combined = new Uint8Array(totalSize); + let writeOffset = 0; + for (const arr of result) { + combined.set(arr, writeOffset); + writeOffset += arr.length; + } + + return combined; + } + + #extractTrackInitSegmentWithHandler( + _fullInitSegment: Uint8Array, + ftypAtom: Uint8Array | null, + moovAtom: Uint8Array, + handlerType: string, + ): Uint8Array { + const trakAtom = this.#findTrackInMoov(moovAtom, handlerType); + if (!trakAtom) { + throw new Error(`Track with handler "${handlerType}" not found`); + } + + const newMoov = this.#rebuildMoovWithSingleTrack(moovAtom, trakAtom, handlerType); + + const result: Uint8Array[] = []; + if (ftypAtom) { + result.push(ftypAtom); + } + result.push(newMoov); + + const totalSize = result.reduce((sum, arr) => sum + arr.length, 0); + const combined = new Uint8Array(totalSize); + let writeOffset = 0; + for (const arr of result) { + combined.set(arr, writeOffset); + writeOffset += arr.length; + } + + return combined; + } + + #getAllTracksInMoov(moovAtom: Uint8Array): Array<{ handler: string | null }> { + const tracks: Array<{ handler: string | null }> = []; + let offset = 8; // Skip moov header + + while (offset + 8 <= moovAtom.length) { + const size = + (moovAtom[offset] << 24) | + (moovAtom[offset + 1] << 16) | + (moovAtom[offset + 2] << 8) | + moovAtom[offset + 3]; + const type = String.fromCharCode( + moovAtom[offset + 4], + moovAtom[offset + 5], + moovAtom[offset + 6], + moovAtom[offset + 7], + ); + + if (type === "trak") { + const trakAtom = moovAtom.slice(offset, offset + size); + const handler = this.#getHandlerType(trakAtom); + tracks.push({ handler: handler || null }); + } + + if (size < 8 || size === 0) break; + offset += size; + } + + return tracks; + } + + #getHandlerType(trakAtom: Uint8Array): string | null { + let offset = 8; // Skip trak header + + while (offset + 8 <= trakAtom.length) { + const size = + (trakAtom[offset] << 24) | + (trakAtom[offset + 1] << 16) | + (trakAtom[offset + 2] << 8) | + trakAtom[offset + 3]; + const type = String.fromCharCode( + trakAtom[offset + 4], + trakAtom[offset + 5], + trakAtom[offset + 6], + trakAtom[offset + 7], + ); + + if (type === "mdia") { + const mdiaAtom = trakAtom.slice(offset, offset + size); + let mdiaOffset = 8; + while (mdiaOffset + 8 <= mdiaAtom.length) { + const hdlrSize = + (mdiaAtom[mdiaOffset] << 24) | + (mdiaAtom[mdiaOffset + 1] << 16) | + (mdiaAtom[mdiaOffset + 2] << 8) | + mdiaAtom[mdiaOffset + 3]; + const hdlrType = String.fromCharCode( + mdiaAtom[mdiaOffset + 4], + mdiaAtom[mdiaOffset + 5], + mdiaAtom[mdiaOffset + 6], + mdiaAtom[mdiaOffset + 7], + ); + + if (hdlrType === "hdlr") { + if (mdiaOffset + 24 <= mdiaAtom.length) { + const handlerTypeBytes = String.fromCharCode( + mdiaAtom[mdiaOffset + 16], + mdiaAtom[mdiaOffset + 17], + mdiaAtom[mdiaOffset + 18], + mdiaAtom[mdiaOffset + 19], + ); + return handlerTypeBytes; + } + } + + if (hdlrSize < 8 || hdlrSize === 0) break; + mdiaOffset += hdlrSize; + } + } + + if (size < 8 || size === 0) break; + offset += size; + } + + return null; + } + + #findTrackInMoov(moovAtom: Uint8Array, handlerType: string): Uint8Array | null { + let offset = 8; // Skip moov header + + while (offset + 8 <= moovAtom.length) { + const size = + (moovAtom[offset] << 24) | + (moovAtom[offset + 1] << 16) | + (moovAtom[offset + 2] << 8) | + moovAtom[offset + 3]; + const type = String.fromCharCode( + moovAtom[offset + 4], + moovAtom[offset + 5], + moovAtom[offset + 6], + moovAtom[offset + 7], + ); + + if (type === "trak") { + const trakAtom = moovAtom.slice(offset, offset + size); + if (this.#trakHasHandler(trakAtom, handlerType)) { + return trakAtom; + } + } + + if (size < 8 || size === 0) break; + offset += size; + } + + return null; + } + + #trakHasHandler(trakAtom: Uint8Array, handlerType: string): boolean { + const foundHandler = this.#getHandlerType(trakAtom); + return foundHandler === handlerType; + } + + #rebuildMoovWithSingleTrack(moovAtom: Uint8Array, trakAtom: Uint8Array, targetHandler: string): Uint8Array { + const parts: Uint8Array[] = []; + let offset = 8; // Skip moov header + + const trackId = this.#getTrackId(trakAtom); + + while (offset + 8 <= moovAtom.length) { + const size = + (moovAtom[offset] << 24) | + (moovAtom[offset + 1] << 16) | + (moovAtom[offset + 2] << 8) | + moovAtom[offset + 3]; + const type = String.fromCharCode( + moovAtom[offset + 4], + moovAtom[offset + 5], + moovAtom[offset + 6], + moovAtom[offset + 7], + ); + + if (type === "mvhd") { + parts.push(moovAtom.slice(offset, offset + size)); + } else if (type === "trak") { + const trak = moovAtom.slice(offset, offset + size); + if (this.#trakHasHandler(trak, targetHandler)) { + parts.push(trak); + } + } else if (type === "mvex") { + const mvexAtom = moovAtom.slice(offset, offset + size); + const rebuiltMvex = this.#rebuildMvexWithSingleTrack(mvexAtom, trackId); + if (rebuiltMvex) { + parts.push(rebuiltMvex); + } + } + + if (size < 8 || size === 0) break; + offset += size; + } + + const totalSize = 8 + parts.reduce((sum, arr) => sum + arr.length, 0); + const newMoov = new Uint8Array(totalSize); + + newMoov[0] = (totalSize >>> 24) & 0xff; + newMoov[1] = (totalSize >>> 16) & 0xff; + newMoov[2] = (totalSize >>> 8) & 0xff; + newMoov[3] = totalSize & 0xff; + newMoov[4] = 0x6d; // 'm' + newMoov[5] = 0x6f; // 'o' + newMoov[6] = 0x6f; // 'o' + newMoov[7] = 0x76; // 'v' + + let writeOffset = 8; + for (const part of parts) { + newMoov.set(part, writeOffset); + writeOffset += part.length; + } + + return newMoov; + } + + #getTrackId(trakAtom: Uint8Array): number { + let offset = 8; // Skip trak header + + while (offset + 8 <= trakAtom.length) { + const size = + (trakAtom[offset] << 24) | + (trakAtom[offset + 1] << 16) | + (trakAtom[offset + 2] << 8) | + trakAtom[offset + 3]; + const type = String.fromCharCode( + trakAtom[offset + 4], + trakAtom[offset + 5], + trakAtom[offset + 6], + trakAtom[offset + 7], + ); + + if (type === "tkhd") { + const version = trakAtom[offset + 8]; + const trackIdOffset = version === 1 ? 24 : 16; + if (offset + trackIdOffset + 4 <= trakAtom.length) { + return ( + (trakAtom[offset + trackIdOffset] << 24) | + (trakAtom[offset + trackIdOffset + 1] << 16) | + (trakAtom[offset + trackIdOffset + 2] << 8) | + trakAtom[offset + trackIdOffset + 3] + ); + } + } + + if (size < 8 || size === 0) break; + offset += size; + } + + return 0; + } + + #rebuildMvexWithSingleTrack(mvexAtom: Uint8Array, trackId: number): Uint8Array | null { + const parts: Uint8Array[] = []; + let offset = 8; // Skip mvex header + + while (offset + 8 <= mvexAtom.length) { + const size = + (mvexAtom[offset] << 24) | + (mvexAtom[offset + 1] << 16) | + (mvexAtom[offset + 2] << 8) | + mvexAtom[offset + 3]; + const type = String.fromCharCode( + mvexAtom[offset + 4], + mvexAtom[offset + 5], + mvexAtom[offset + 6], + mvexAtom[offset + 7], + ); + + if (type === "trex") { + if (offset + 16 <= mvexAtom.length) { + const trexTrackId = + (mvexAtom[offset + 12] << 24) | + (mvexAtom[offset + 13] << 16) | + (mvexAtom[offset + 14] << 8) | + mvexAtom[offset + 15]; + if (trexTrackId === trackId) { + parts.push(mvexAtom.slice(offset, offset + size)); + } + } + } + + if (size < 8 || size === 0) break; + offset += size; + } + + if (parts.length === 0) { + return null; + } + + const totalSize = 8 + parts.reduce((sum, arr) => sum + arr.length, 0); + const newMvex = new Uint8Array(totalSize); + + newMvex[0] = (totalSize >>> 24) & 0xff; + newMvex[1] = (totalSize >>> 16) & 0xff; + newMvex[2] = (totalSize >>> 8) & 0xff; + newMvex[3] = totalSize & 0xff; + newMvex[4] = 0x6d; // 'm' + newMvex[5] = 0x76; // 'v' + newMvex[6] = 0x65; // 'e' + newMvex[7] = 0x78; // 'x' + + let writeOffset = 8; + for (const part of parts) { + newMvex.set(part, writeOffset); + writeOffset += part.length; + } + + return newMvex; + } + + #processVideoQueue(): void { + if (!this.#videoSourceBuffer || this.#videoSourceBuffer.updating || this.#videoAppendQueue.length === 0) { + return; + } + + if (this.#mediaSource?.readyState !== "open") { + return; + } + + // Wait if any SourceBuffer is updating (dash.js pattern) + if (this.#isBufferUpdating()) { + return; + } + + const fragment = this.#videoAppendQueue.shift(); + if (!fragment) return; + + try { + this.#videoSourceBuffer.appendBuffer(fragment as BufferSource); + this.#stats.update((current) => { + const newCount = (current?.frameCount ?? 0) + 1; + if (newCount === 1 || newCount % 10 === 0) { + console.log(`[MSE] Appended video fragment ${newCount}, size: ${fragment.byteLength} bytes`); + } + return { + frameCount: newCount, + timestamp: current?.timestamp ?? 0, + bytesReceived: (current?.bytesReceived ?? 0) + fragment.byteLength, + }; + }); + } catch (error) { + // Let browser handle buffer management - just log the error + if (error instanceof DOMException && error.name === "QuotaExceededError") { + console.warn("[MSE] QuotaExceededError - browser will manage buffer automatically"); + // Put fragment back in queue to retry later + this.#videoAppendQueue.unshift(fragment); + } else { + console.error("[MSE] Error appending video fragment:", error); + } + } + } + + #processAudioQueue(): void { + if (!this.#audioSourceBuffer || this.#audioSourceBuffer.updating || this.#audioAppendQueue.length === 0) { + return; + } + + if (this.#mediaSource?.readyState !== "open") { + return; + } + + // Wait if any SourceBuffer is updating (dash.js pattern) + if (this.#isBufferUpdating()) { + return; + } + + const fragment = this.#audioAppendQueue.shift(); + if (!fragment) return; + + try { + this.#audioSourceBuffer.appendBuffer(fragment as BufferSource); + } catch (error) { + // Let browser handle buffer management - just log the error + if (error instanceof DOMException && error.name === "QuotaExceededError") { + console.warn("[MSE] QuotaExceededError for audio - browser will manage buffer automatically"); + // Put fragment back in queue to retry later + this.#audioAppendQueue.unshift(fragment); + } else { + console.error("[MSE] Error appending audio fragment:", error); + } + } + } + + // Backward compatibility - delegates to appendVideoFragment + async appendFragment(fragment: Uint8Array): Promise { + return this.appendVideoFragment(fragment); + } + + async runTrack( + effect: Effect, + broadcast: Moq.Broadcast, + name: string, + config: RequiredDecoderConfig, + ): Promise { + await this.initializeVideo(config); + + // Briefly wait for audio SourceBuffer so we don't hit Chrome's quota race. + console.log("[MSE] Checking if audio SourceBuffer will be added..."); + for (let i = 0; i < 10; i++) { + // up to ~1s + if (this.#audioSourceBuffer || (this.#mediaSource && this.#mediaSource.sourceBuffers.length >= 2)) { + console.log("[MSE] Audio SourceBuffer detected, proceeding with video"); + break; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + const sub = broadcast.subscribe(name, PRIORITY.video); + effect.cleanup(() => sub.close()); + + const consumer = new Frame.Consumer(sub, { + latency: this.latency, + container: "cmaf", + }); + effect.cleanup(() => consumer.close()); + + // Init segment must be in catalog for CMAF + if (!config.initSegment) { + throw new Error("Init segment is required in catalog for CMAF playback"); + } + + // Decode base64 string to Uint8Array + const binaryString = atob(config.initSegment); + const fullInitSegment = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + fullInitSegment[i] = binaryString.charCodeAt(i); + } + + // Extract video-specific init segment + const videoInitSegment = this.#extractTrackInitSegment(fullInitSegment, "video"); + + // Append init segment and wait for completion + if (!this.#videoSourceBuffer) { + throw new Error("Video SourceBuffer not available"); + } + + const videoSourceBuffer = this.#videoSourceBuffer; + console.log("[MSE] Appending video init segment, size:", videoInitSegment.byteLength, "bytes"); + await new Promise((resolve, reject) => { + const onUpdateEnd = () => { + videoSourceBuffer.removeEventListener("updateend", onUpdateEnd); + videoSourceBuffer.removeEventListener("error", onError); + console.log("[MSE] Video init segment appended successfully"); + resolve(); + }; + + const onError = (e: Event) => { + videoSourceBuffer.removeEventListener("updateend", onUpdateEnd); + videoSourceBuffer.removeEventListener("error", onError); + const error = e as ErrorEvent; + console.error("[MSE] Video SourceBuffer error appending init segment:", error); + reject(new Error(`Video SourceBuffer error: ${error.message || "unknown error"}`)); + }; + + videoSourceBuffer.addEventListener("updateend", onUpdateEnd, { once: true }); + videoSourceBuffer.addEventListener("error", onError, { once: true }); + + try { + videoSourceBuffer.appendBuffer(videoInitSegment as BufferSource); + } catch (error) { + videoSourceBuffer.removeEventListener("updateend", onUpdateEnd); + videoSourceBuffer.removeEventListener("error", onError); + console.error("[MSE] Error calling appendBuffer on video init segment:", error); + reject(error); + } + }); + + // Helper function to detect init segment + function isInitSegmentData(data: Uint8Array): boolean { + if (data.length < 8) return false; + + let offset = 0; + const len = data.length; + + while (offset + 8 <= len) { + const size = + (data[offset] << 24) | (data[offset + 1] << 16) | (data[offset + 2] << 8) | data[offset + 3]; + + const type = String.fromCharCode( + data[offset + 4], + data[offset + 5], + data[offset + 6], + data[offset + 7], + ); + + if (type === "ftyp" || type === "moov") return true; + + if (size < 8 || size === 0) break; + offset += size; + } + + return false; + } + + // Read fragments and append to SourceBuffer + // Each fragment is already a complete CMAF segment (moof+mdat), so we can append individually + // This reduces latency and memory usage compared to batching by group + console.log("[MSE] Starting to read video fragments from track"); + effect.spawn(async () => { + let frameCount = 0; + + for (;;) { + const frame = await Promise.race([consumer.decode(), effect.cancel]); + if (!frame) { + console.log(`[MSE] Video track ended, processed ${frameCount} frames`); + break; + } + + frameCount++; + if (frameCount === 1 || frameCount % 10 === 0) { + console.log(`[MSE] Processing video frame ${frameCount}, group: ${frame.group}`); + } + + // Skip any init segments that might come from track + if (isInitSegmentData(frame.data)) { + continue; + } + + // Append fragment immediately - each fragment is a complete CMAF segment + await this.appendVideoFragment(frame.data); + } + }); + } + + async runAudioTrack( + effect: Effect, + broadcast: Moq.Broadcast, + name: string, + config: Catalog.AudioConfig, + catalog: Catalog.Audio, + enabled?: Getter, + ): Promise { + // Check if audio SourceBuffer was initialized + // If not, allow video-only playback + if (!this.#audioSourceBuffer) { + console.log("[MSE] Audio SourceBuffer not available, skipping audio track (video-only playback)"); + return; + } + + // Init segment must be in catalog for CMAF + if (!config.initSegment) { + throw new Error("Init segment is required in catalog for CMAF audio playback"); + } + + // Decode base64 string to Uint8Array + const binaryString = atob(config.initSegment); + const fullInitSegment = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + fullInitSegment[i] = binaryString.charCodeAt(i); + } + + // Extract audio-specific init segment + const audioInitSegment = this.#extractTrackInitSegment(fullInitSegment, "audio"); + + // Append init segment + await this.appendAudioFragment(audioInitSegment); + + // Check if enabled + const isEnabled = enabled ? effect.get(enabled) : true; + if (!isEnabled) { + return; + } + + const sub = broadcast.subscribe(name, catalog.priority); + effect.cleanup(() => sub.close()); + + const consumer = new Frame.Consumer(sub, { + latency: this.latency, + container: "cmaf", + }); + effect.cleanup(() => consumer.close()); + + function hasMoovAtom(data: Uint8Array): boolean { + let offset = 0; + const len = data.length; + while (offset + 8 <= len) { + const size = + (data[offset] << 24) | (data[offset + 1] << 16) | (data[offset + 2] << 8) | data[offset + 3]; + const type = String.fromCharCode( + data[offset + 4], + data[offset + 5], + data[offset + 6], + data[offset + 7], + ); + if (type === "moov") return true; + if (size < 8 || size === 0) break; + offset += size; + } + return false; + } + + effect.spawn(async () => { + for (;;) { + const frame = await Promise.race([consumer.decode(), effect.cancel]); + if (!frame) { + break; + } + + if (this.#mediaSource?.readyState === "closed") { + break; + } + + // Skip any init segments + if (hasMoovAtom(frame.data)) { + continue; + } + + // Append fragment immediately - each fragment is a complete CMAF segment + if (this.#mediaSource?.readyState === "open") { + await this.appendAudioFragment(frame.data); + } + } + }); + } + + close(): void { + this.#videoAppendQueue = []; + this.#audioAppendQueue = []; + this.#audioSourceBufferSetup = false; + + // Store references before resetting + const audioSourceBuffer = this.#audioSourceBuffer; + const videoSourceBuffer = this.#videoSourceBuffer; + const mediaSource = this.#mediaSource; + + this.#audioSourceBuffer = undefined; // Reset audio SourceBuffer reference + + this.mediaSource.set(undefined); + + if (this.#frameCallbackId !== undefined) { + if (this.#video?.requestVideoFrameCallback) { + this.#video.cancelVideoFrameCallback(this.#frameCallbackId); + } else { + cancelAnimationFrame(this.#frameCallbackId); + } + } + + this.frame.update((prev) => { + prev?.close(); + return undefined; + }); + + if (videoSourceBuffer && mediaSource) { + try { + if (videoSourceBuffer.updating) { + videoSourceBuffer.abort(); + } + } catch (error) { + console.error("Error closing video SourceBuffer:", error); + } + } + + if (audioSourceBuffer && mediaSource) { + try { + if (audioSourceBuffer.updating) { + audioSourceBuffer.abort(); + } + } catch (error) { + console.error("Error closing audio SourceBuffer:", error); + } + } + + if (this.#mediaSource) { + try { + if (this.#mediaSource.readyState === "open") { + this.#mediaSource.endOfStream(); + } + URL.revokeObjectURL(this.#video?.src || ""); + } catch (error) { + console.error("Error closing MediaSource:", error); + } + } + + if (this.#video) { + this.#video.pause(); + this.#video.src = ""; + this.#video.remove(); + } + + this.#signals.close(); + } + + get stats() { + return this.#stats; + } +} diff --git a/js/hang/src/watch/video/source.ts b/js/hang/src/watch/video/source.ts index a7fd2c923..1ecaa010c 100644 --- a/js/hang/src/watch/video/source.ts +++ b/js/hang/src/watch/video/source.ts @@ -5,6 +5,7 @@ import * as Frame from "../../frame"; import { PRIORITY } from "../../publish/priority"; import type * as Time from "../../time"; import * as Hex from "../../util/hex"; +import type { SourceMSE } from "../source-mse"; export type SourceProps = { enabled?: boolean | Signal; @@ -27,7 +28,9 @@ export type Target = { // The types in VideoDecoderConfig that cause a hard reload. // ex. codedWidth/Height are optional and can be changed in-band, so we don't want to trigger a reload. // This way we can keep the current subscription active. -type RequiredDecoderConfig = Omit; +// Note: We keep codedWidth/Height as optional for logging, but set them to undefined to avoid reloads. +type RequiredDecoderConfig = Omit & + Partial>; type BufferStatus = { state: "empty" | "filled" }; @@ -97,6 +100,14 @@ export class Source { #signals = new Effect(); + // Expose MediaSource for audio to use + #mseMediaSource = new Signal(undefined); + readonly mseMediaSource = this.#mseMediaSource as Getter; + + // Expose mseSource instance for audio to access coordination methods + #mseSource = new Signal(undefined); + readonly mseSource = this.#mseSource as Getter; + constructor( broadcast: Signal, catalog: Signal, @@ -110,6 +121,14 @@ export class Source { const c = effect.get(catalog)?.video; effect.set(this.catalog, c); effect.set(this.flip, c?.flip); + + if (c) { + console.log("[Video Catalog]", { + renditions: Object.keys(c.renditions ?? {}), + renditionCount: Object.keys(c.renditions ?? {}).length, + flip: c.flip, + }); + } }); this.#signals.effect(this.#runSupported.bind(this)); @@ -192,12 +211,87 @@ export class Source { } #runTrack(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { + // Route to MSE for CMAF, WebCodecs for native/raw + if (config.container === "cmaf") { + this.#runMSEPath(effect, broadcast, name, config); + } else { + this.#runWebCodecsPath(effect, broadcast, name, config); + } + } + + #runMSEPath(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { + console.log("[Video Stream] Subscribing to track", { + name, + codec: config.codec, + container: config.container, + width: config.codedWidth, + height: config.codedHeight, + }); + // Import MSE source dynamically to avoid loading if not needed + effect.spawn(async () => { + const { SourceMSE } = await import("../source-mse.js"); + const mseSource = new SourceMSE(this.latency); + effect.cleanup(() => mseSource.close()); + + // Forward signals using effects + this.#signals.effect((eff) => { + const frame = eff.get(mseSource.frame); + eff.set(this.frame, frame); + }); + + this.#signals.effect((eff) => { + const display = eff.get(mseSource.display); + eff.set(this.display, display); + }); + + this.#signals.effect((eff) => { + const status = eff.get(mseSource.bufferStatus); + eff.set(this.bufferStatus, status, { state: "empty" }); + }); + + this.#signals.effect((eff) => { + const status = eff.get(mseSource.syncStatus); + eff.set(this.syncStatus, status, { state: "ready" }); + }); + + this.#signals.effect((eff) => { + const mediaSource = eff.get(mseSource.mediaSource); + eff.set(this.#mseMediaSource, mediaSource); + }); + + // Expose mseSource for audio to access + this.#signals.effect((eff) => { + eff.set(this.#mseSource, mseSource); + }); + + this.#signals.effect((eff) => { + const stats = eff.get(mseSource.stats); + eff.set(this.#stats, stats); + }); + // Run MSE track + try { + await mseSource.runTrack(effect, broadcast, name, config); + } catch (error) { + console.error("MSE path error, falling back to WebCodecs:", error); + // Fallback to WebCodecs + this.#runWebCodecsPath(effect, broadcast, name, config); + } + }); + } + + #runWebCodecsPath(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { + console.log("[Video Stream] Subscribing to track", { + name, + codec: config.codec, + container: config.container, + width: config.codedWidth, + height: config.codedHeight, + }); const sub = broadcast.subscribe(name, PRIORITY.video); // TODO use priority from catalog effect.cleanup(() => sub.close()); // Create consumer that reorders groups/frames up to the provided latency. - // Container defaults to "legacy" via Zod schema for backward compatibility - console.log(`[Video Subscriber] Using container format: ${config.container}`); + // Container defaults to "native" via Zod schema for backward compatibility const consumer = new Frame.Consumer(sub, { latency: this.latency, container: config.container, diff --git a/justfile b/justfile index f23bc544d..2afe2354e 100644 --- a/justfile +++ b/justfile @@ -147,7 +147,7 @@ pub name url="http://localhost:4443/anon" *args: - | TOKIO_CONSOLE_BIND=127.0.0.1:6681 cargo run --bin hang -- publish --url "{{url}}" --name "{{name}}" fmp4 {{args}} # Generate and ingest an HLS stream from a video file. -pub-hls name relay="http://localhost:4443/anon": +pub-hls name passthrough='' relay="http://localhost:4443/anon": #!/usr/bin/env bash set -euo pipefail @@ -174,7 +174,7 @@ pub-hls name relay="http://localhost:4443/anon": -c:v:1 libx264 -profile:v:1 high -level:v:1 4.1 -pix_fmt:v:1 yuv420p -tag:v:1 avc1 \ -b:v:1 300k -maxrate:v:1 330k -bufsize:v:1 600k \ -c:a aac -b:a 128k \ - -f hls -hls_time 2 -hls_list_size 12 \ + -f hls -hls_time 2 -hls_list_size 6 \ -hls_flags independent_segments+delete_segments \ -hls_segment_type fmp4 \ -master_pl_name master.m3u8 \ @@ -200,18 +200,58 @@ pub-hls name relay="http://localhost:4443/anon": exit 1 fi - echo ">>> Starting HLS ingest from disk: $OUT_DIR/master.m3u8" + # Wait for individual playlists to be generated (they're referenced in master.m3u8) + # Give ffmpeg a bit more time to generate the variant playlists + echo ">>> Waiting for variant playlists..." + sleep 2 + for i in {1..20}; do + # Check if at least one variant playlist exists + if [ -f "$OUT_DIR/v0/stream.m3u8" ] || [ -f "$OUT_DIR/v720/stream.m3u8" ] || [ -f "$OUT_DIR/v144/stream.m3u8" ] || [ -f "$OUT_DIR/vaudio/stream.m3u8" ]; then + break + fi + sleep 0.5 + done + + # Check if passthrough flag is provided (boolean parameter) + if [ -n "{{passthrough}}" ]; then + echo ">>> Starting HLS ingest from disk with passthrough mode: $OUT_DIR/master.m3u8" + PASSTHROUGH_FLAG="--passthrough" + else + echo ">>> Starting HLS ingest from disk (non-passthrough mode): $OUT_DIR/master.m3u8" + PASSTHROUGH_FLAG="" + fi # Trap to clean up ffmpeg on exit + CLEANUP_CALLED=false cleanup() { + if [ "$CLEANUP_CALLED" = "true" ]; then + return + fi + CLEANUP_CALLED=true echo "Shutting down..." kill $FFMPEG_PID 2>/dev/null || true - exit 0 + # Wait a bit for ffmpeg to finish + sleep 0.5 + # Force kill if still running + kill -9 $FFMPEG_PID 2>/dev/null || true } - trap cleanup SIGINT SIGTERM + trap cleanup SIGINT SIGTERM EXIT # Run hang to ingest from local files - cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" + if [ -n "$PASSTHROUGH_FLAG" ]; then + echo ">>> Running with --passthrough flag" + cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" --passthrough + else + echo ">>> Running without --passthrough flag" + cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" + fi + EXIT_CODE=$? + + # Cleanup after cargo run completes (success or failure) + cleanup + + # Exit with the same code as cargo run + exit $EXIT_CODE # Publish a video using H.264 Annex B format to the localhost relay server pub-h264 name url="http://localhost:4443/anon" *args: diff --git a/rs/hang-cli/src/publish.rs b/rs/hang-cli/src/publish.rs index eb12d8771..44f322424 100644 --- a/rs/hang-cli/src/publish.rs +++ b/rs/hang-cli/src/publish.rs @@ -16,6 +16,9 @@ pub enum PublishFormat { /// URL or file path of an HLS playlist to ingest. #[arg(long)] playlist: String, + /// Enable passthrough mode to transport complete CMAF fragments (moof+mdat) without decomposing. + #[arg(long)] + passthrough: bool, }, } @@ -45,12 +48,17 @@ impl Publish { let stream = Decoder::new(broadcast.clone(), format); PublishDecoder::Decoder(Box::new(stream)) } - PublishFormat::Hls { playlist } => { + PublishFormat::Hls { playlist, passthrough } => { + tracing::info!( + passthrough = *passthrough, + "HLS publish preserving original container format." + ); let hls = hang::import::Hls::new( broadcast.clone(), hang::import::HlsConfig { playlist: playlist.clone(), client: None, + passthrough: *passthrough, }, )?; PublishDecoder::Hls(Box::new(hls)) diff --git a/rs/hang/Cargo.toml b/rs/hang/Cargo.toml index 75928bc1f..8e7500a4a 100644 --- a/rs/hang/Cargo.toml +++ b/rs/hang/Cargo.toml @@ -32,7 +32,7 @@ reqwest = { version = "0.12", default-features = false, features = [ scuffle-h265 = "0.2.2" serde = { workspace = true } serde_json = "1" -serde_with = { version = "3", features = ["hex"] } +serde_with = { version = "3", features = ["hex", "base64"] } thiserror = "2" tokio = { workspace = true, features = ["macros", "fs"] } tracing = "0.1" diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index 5132db605..112e00a8b 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -70,6 +70,8 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: hang::catalog::Container::Native, + init_segment: None, }; // Create a map of video renditions diff --git a/rs/hang/src/catalog/audio/mod.rs b/rs/hang/src/catalog/audio/mod.rs index 7322bc307..271fcb515 100644 --- a/rs/hang/src/catalog/audio/mod.rs +++ b/rs/hang/src/catalog/audio/mod.rs @@ -9,7 +9,9 @@ use std::collections::BTreeMap; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use serde_with::{hex::Hex, DisplayFromStr}; +use serde_with::{base64::Base64, hex::Hex, DisplayFromStr}; + +use crate::catalog::container::Container; /// Information about an audio track in the catalog. /// @@ -60,4 +62,18 @@ pub struct AudioConfig { #[serde(default)] #[serde_as(as = "Option")] pub description: Option, + + /// Container format for frame encoding. + /// Defaults to "native" for backward compatibility. + pub container: Container, + + /// Init segment (ftyp+moov) for CMAF/fMP4 containers. + /// + /// This is the initialization segment needed for MSE playback. + /// Stored as base64-encoded bytes and embedded in the catalog (as suggested + /// in feedback). Init segments should not be sent over data tracks or at the + /// start of each group. + #[serde(default)] + #[serde_as(as = "Option")] + pub init_segment: Option, } diff --git a/rs/hang/src/catalog/container.rs b/rs/hang/src/catalog/container.rs new file mode 100644 index 000000000..cecee5d57 --- /dev/null +++ b/rs/hang/src/catalog/container.rs @@ -0,0 +1,18 @@ +use serde::{Deserialize, Serialize}; + +/// Container format for frame timestamp encoding and frame payload structure. +/// +/// - "native": Uses QUIC VarInt encoding (1-8 bytes, variable length), raw frame payloads +/// - "raw": Uses fixed u64 encoding (8 bytes, big-endian), raw frame payloads +/// - "cmaf": Fragmented MP4 container - frames contain complete moof+mdat fragments +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)] +#[serde(rename_all = "camelCase")] +pub enum Container { + #[serde(rename = "native")] + #[default] + Native, + #[serde(rename = "raw")] + Raw, + #[serde(rename = "cmaf")] + Cmaf, +} diff --git a/rs/hang/src/catalog/mod.rs b/rs/hang/src/catalog/mod.rs index 69602dc26..4d8673bd5 100644 --- a/rs/hang/src/catalog/mod.rs +++ b/rs/hang/src/catalog/mod.rs @@ -7,6 +7,7 @@ mod audio; mod chat; +mod container; mod preview; mod root; mod track; @@ -15,6 +16,7 @@ mod video; pub use audio::*; pub use chat::*; +pub use container::*; pub use preview::*; pub use root::*; pub use track::*; diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index 2defc55cb..608458416 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -205,6 +205,22 @@ impl Drop for CatalogGuard<'_> { // TODO decide if this should return an error, or be impossible to fail let frame = self.catalog.to_string().expect("invalid catalog"); + + // Log the catalog JSON to verify container field is included + if let Some(video) = &self.catalog.video { + for (name, config) in &video.renditions { + tracing::info!(track = name, container = ?config.container, "publishing catalog with container"); + } + } + if let Some(audio) = &self.catalog.audio { + for (name, config) in &audio.renditions { + tracing::info!(track = name, container = ?config.container, "publishing catalog with container"); + } + } + + // Log the full catalog JSON to debug serialization + tracing::debug!(catalog_json = %frame, "publishing catalog JSON"); + group.write_frame(frame); group.close(); } @@ -269,7 +285,7 @@ impl From for CatalogConsumer { mod test { use std::collections::BTreeMap; - use crate::catalog::{AudioCodec::Opus, AudioConfig, VideoConfig, H264}; + use crate::catalog::{AudioCodec::Opus, AudioConfig, Container, VideoConfig, H264}; use super::*; @@ -283,7 +299,8 @@ mod test { "codedWidth": 1280, "codedHeight": 720, "bitrate": 6000000, - "framerate": 30.0 + "framerate": 30.0, + "container": "native" } }, "priority": 1 @@ -294,7 +311,8 @@ mod test { "codec": "opus", "sampleRate": 48000, "numberOfChannels": 2, - "bitrate": 128000 + "bitrate": 128000, + "container": "native" } }, "priority": 2 @@ -323,6 +341,8 @@ mod test { bitrate: Some(6_000_000), framerate: Some(30.0), optimize_for_latency: None, + container: Container::Native, + init_segment: None, }, ); @@ -335,6 +355,8 @@ mod test { channel_count: 2, bitrate: Some(128_000), description: None, + container: Container::Native, + init_segment: None, }, ); diff --git a/rs/hang/src/catalog/video/mod.rs b/rs/hang/src/catalog/video/mod.rs index 4af6de7f0..615934f72 100644 --- a/rs/hang/src/catalog/video/mod.rs +++ b/rs/hang/src/catalog/video/mod.rs @@ -14,7 +14,9 @@ use std::collections::BTreeMap; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use serde_with::{hex::Hex, DisplayFromStr}; +use serde_with::{base64::Base64, hex::Hex, DisplayFromStr}; + +use crate::catalog::container::Container; /// Information about a video track in the catalog. /// @@ -109,4 +111,22 @@ pub struct VideoConfig { /// Default: true #[serde(default)] pub optimize_for_latency: Option, + + /// Container format for frame encoding. + /// Defaults to "native" for backward compatibility. + pub container: Container, + + /// Init segment (ftyp+moov) for CMAF/fMP4 containers. + /// + /// This is the initialization segment needed for MSE playback. + /// Stored as base64-encoded bytes and embedded in the catalog (as suggested + /// in feedback). Init segments should not be sent over data tracks or at the + /// start of each group. + /// + /// Note: A future optimization could build init segments from the description + /// field (e.g., avcC box for H.264) along with other catalog metadata, but + /// for now we store the complete init segment for simplicity and correctness. + #[serde(default)] + #[serde_as(as = "Option")] + pub init_segment: Option, } diff --git a/rs/hang/src/import/aac.rs b/rs/hang/src/import/aac.rs index 065282374..fe79f652e 100644 --- a/rs/hang/src/import/aac.rs +++ b/rs/hang/src/import/aac.rs @@ -107,6 +107,8 @@ impl Aac { channel_count, bitrate: None, description: None, + container: hang::catalog::Container::Native, + init_segment: None, }; tracing::debug!(name = ?track.name, ?config, "starting track"); diff --git a/rs/hang/src/import/avc3.rs b/rs/hang/src/import/avc3.rs index ec8d472ba..b2b14e8de 100644 --- a/rs/hang/src/import/avc3.rs +++ b/rs/hang/src/import/avc3.rs @@ -62,6 +62,8 @@ impl Avc3 { display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: hang::catalog::Container::Native, + init_segment: None, }; if let Some(old) = &self.config { diff --git a/rs/hang/src/import/fmp4.rs b/rs/hang/src/import/fmp4.rs index 96d5f9432..4e476e081 100644 --- a/rs/hang/src/import/fmp4.rs +++ b/rs/hang/src/import/fmp4.rs @@ -1,4 +1,6 @@ -use crate::catalog::{AudioCodec, AudioConfig, CatalogProducer, VideoCodec, VideoConfig, AAC, AV1, H264, H265, VP9}; +use crate::catalog::{ + AudioCodec, AudioConfig, CatalogProducer, Container, VideoCodec, VideoConfig, AAC, AV1, H264, H265, VP9, +}; use crate::{self as hang, Timestamp}; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; @@ -38,12 +40,28 @@ pub struct Fmp4 { // The timestamp of the last keyframe for each track last_keyframe: HashMap, + // Track if we've sent the first frame for each track (needed for passthrough mode) + first_frame_sent: HashMap, + // The moov atom at the start of the file. moov: Option, // The latest moof header moof: Option, moof_size: usize, + + /// When true, transport CMAF fragments directly (passthrough mode) + /// When false, decompose fragments into individual samples (current behavior) + passthrough_mode: bool, + + /// When passthrough_mode is enabled, store raw bytes of moof + moof_bytes: Option, + + /// When passthrough_mode is enabled, store raw bytes of ftyp (file type box) + ftyp_bytes: Option, + + /// When passthrough_mode is enabled, store raw bytes of moov (init segment) + moov_bytes: Option, } impl Fmp4 { @@ -58,15 +76,37 @@ impl Fmp4 { catalog, tracks: HashMap::default(), last_keyframe: HashMap::default(), + first_frame_sent: HashMap::default(), moov: None, moof: None, moof_size: 0, + passthrough_mode: false, + moof_bytes: None, + ftyp_bytes: None, + moov_bytes: None, } } + /// Set passthrough mode for CMAF fragment transport. + /// + /// When enabled, complete fMP4 fragments (moof+mdat) are transported directly + /// instead of being decomposed into individual samples. + pub fn set_passthrough_mode(&mut self, enabled: bool) { + self.passthrough_mode = enabled; + } + pub fn decode>(&mut self, buf: &mut T) -> anyhow::Result<()> { + // If passthrough mode, we need to extract raw bytes before parsing. + let available_bytes = if self.passthrough_mode && buf.has_remaining() { + let chunk = buf.chunk(); + Some(Bytes::copy_from_slice(chunk)) + } else { + None + }; + let mut cursor = std::io::Cursor::new(buf); let mut position = 0; + let mut bytes_offset = 0; while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor)? { // Process the parsed atom. @@ -75,9 +115,45 @@ impl Fmp4 { match atom { Any::Ftyp(_) | Any::Styp(_) => { - // Skip + // If passthrough mode, capture raw bytes of ftyp (file type box) + if self.passthrough_mode { + if let Some(ref bytes) = available_bytes { + if bytes_offset + size <= bytes.len() { + self.ftyp_bytes = Some(bytes.slice(bytes_offset..bytes_offset + size)); + tracing::debug!(ftyp_size = size, bytes_offset, "captured ftyp bytes for init segment"); + } else { + tracing::warn!( + bytes_offset, + size, + available_len = bytes.len(), + "ftyp bytes out of range" + ); + } + } else { + tracing::warn!("passthrough mode but available_bytes is None when processing ftyp"); + } + } + // Skip ftyp/styp atoms in normal processing } Any::Moov(moov) => { + // If passthrough mode, capture raw bytes of moov (init segment) + if self.passthrough_mode { + if let Some(ref bytes) = available_bytes { + if bytes_offset + size <= bytes.len() { + self.moov_bytes = Some(bytes.slice(bytes_offset..bytes_offset + size)); + tracing::debug!(moov_size = size, bytes_offset, "captured moov bytes for init segment"); + } else { + tracing::warn!( + bytes_offset, + size, + available_len = bytes.len(), + "moov bytes out of range" + ); + } + } else { + tracing::warn!("passthrough mode but available_bytes is None when processing moov"); + } + } // Create the broadcast. self.init(moov)?; } @@ -89,17 +165,59 @@ impl Fmp4 { self.moof = Some(moof); self.moof_size = size; + + // If passthrough mode, extract and store raw bytes of moof + if let Some(ref bytes) = available_bytes { + if bytes_offset + size <= bytes.len() { + self.moof_bytes = Some(bytes.slice(bytes_offset..bytes_offset + size)); + } + } } Any::Mdat(mdat) => { - // Extract the samples from the mdat atom. - let header_size = size - mdat.data.len(); - self.extract(mdat, header_size)?; + if self.passthrough_mode { + // Transport complete fragment + let moof = self.moof.take().context("missing moof box")?; + let moof_bytes = self.moof_bytes.take().context("missing moof bytes")?; + + // Extract mdat bytes + let mdat_bytes = if let Some(ref bytes) = available_bytes { + if bytes_offset + size <= bytes.len() { + bytes.slice(bytes_offset..bytes_offset + size) + } else { + anyhow::bail!("invalid buffer position for mdat"); + } + } else { + anyhow::bail!("missing available bytes in passthrough mode"); + }; + + // Combine moof + mdat into complete fragment + let mut fragment_bytes = BytesMut::with_capacity(moof_bytes.len() + mdat_bytes.len()); + fragment_bytes.extend_from_slice(&moof_bytes); + fragment_bytes.extend_from_slice(&mdat_bytes); + let fragment = fragment_bytes.freeze(); + + tracing::info!( + moof_size = moof_bytes.len(), + mdat_size = mdat_bytes.len(), + total_fragment_size = fragment.len(), + "processing CMAF fragment (moof+mdat)" + ); + self.transport_fragment(fragment, moof)?; + tracing::info!("finished processing CMAF fragment, ready for next fragment"); + } else { + // Extract the samples from the mdat atom (existing behavior) + let header_size = size - mdat.data.len(); + self.extract(mdat, header_size)?; + } } _ => { - // Skip unknown atoms - tracing::warn!(?atom, "skipping") + // Skip unknown atoms (e.g., sidx, which is optional and used for segment indexing) + // These are safe to ignore and don't affect playback + tracing::debug!(?atom, "skipping optional atom") } } + + bytes_offset += size; } // Advance the buffer by the amount of data that was processed. @@ -113,15 +231,22 @@ impl Fmp4 { } fn init(&mut self, moov: Moov) -> anyhow::Result<()> { + let passthrough_mode = self.passthrough_mode; + tracing::info!(passthrough_mode, "initializing fMP4 with passthrough mode"); let mut catalog = self.catalog.lock(); + // Track which specific tracks were created in this init call + let mut created_video_tracks = Vec::new(); + let mut created_audio_tracks = Vec::new(); + for trak in &moov.trak { let track_id = trak.tkhd.track_id; let handler = &trak.mdia.hdlr.handler; let track = match handler.as_ref() { b"vide" => { - let config = Self::init_video(trak)?; + let config = Self::init_video_static(trak, passthrough_mode)?; + tracing::info!(container = ?config.container, "created video config with container"); let track = moq::Track { name: self.broadcast.track_name("video"), @@ -130,15 +255,19 @@ impl Fmp4 { tracing::debug!(name = ?track.name, ?config, "starting track"); - let video = catalog.insert_video(track.name.clone(), config); + let video = catalog.insert_video(track.name.clone(), config.clone()); video.priority = 1; + // Record this track name + created_video_tracks.push(track.name.clone()); + let track = track.produce(); self.broadcast.insert_track(track.consumer); - track.producer + hang::TrackProducer::new(track.producer, config.container) } b"soun" => { - let config = Self::init_audio(trak)?; + let config = Self::init_audio_static(trak, passthrough_mode)?; + tracing::info!(container = ?config.container, "created audio config with container"); let track = moq::Track { name: self.broadcast.track_name("audio"), @@ -147,26 +276,139 @@ impl Fmp4 { tracing::debug!(name = ?track.name, ?config, "starting track"); - let audio = catalog.insert_audio(track.name.clone(), config); + let audio = catalog.insert_audio(track.name.clone(), config.clone()); audio.priority = 2; + // Record this track name + created_audio_tracks.push(track.name.clone()); + let track = track.produce(); self.broadcast.insert_track(track.consumer); - track.producer + hang::TrackProducer::new(track.producer, config.container) } b"sbtl" => anyhow::bail!("subtitle tracks are not supported"), handler => anyhow::bail!("unknown track type: {:?}", handler), }; - self.tracks.insert(track_id, track.into()); + self.tracks.insert(track_id, track); } + // Verify that the moov atom contains all expected tracks BEFORE moving it + let moov_track_count = moov.trak.len(); + let has_video = moov.trak.iter().any(|t| t.mdia.hdlr.handler.as_ref() == b"vide"); + let has_audio = moov.trak.iter().any(|t| t.mdia.hdlr.handler.as_ref() == b"soun"); + self.moov = Some(moov); + // In passthrough mode, store the init segment (ftyp+moov) in the catalog + // instead of sending it over the data tracks. This allows clients to + // reconstruct init segments from the catalog. + // + // Note: Init segments are embedded in the catalog. + // A future optimization could build init segments from the description field + // (e.g., avcC box for H.264) along with other catalog metadata, but for now + // we store the complete init segment for simplicity and correctness. + if passthrough_mode { + if let Some(moov_bytes) = self.moov_bytes.as_ref() { + // Build init segment: ftyp (if available) + moov + let mut init_segment = BytesMut::new(); + if let Some(ref ftyp_bytes) = self.ftyp_bytes { + init_segment.extend_from_slice(ftyp_bytes); + tracing::debug!(ftyp_size = ftyp_bytes.len(), "including ftyp in init segment"); + } + init_segment.extend_from_slice(moov_bytes); + let init_segment_bytes = init_segment.freeze(); + + // Verify that the moov atom contains all expected tracks + let expected_video_tracks = catalog.video.as_ref().map(|v| v.renditions.len()).unwrap_or(0); + let expected_audio_tracks = catalog.audio.as_ref().map(|a| a.renditions.len()).unwrap_or(0); + + tracing::info!( + tracks_in_moov = moov_track_count, + expected_video = expected_video_tracks, + expected_audio = expected_audio_tracks, + tracks_processed = self.tracks.len(), + init_segment_size = init_segment_bytes.len(), + ftyp_included = self.ftyp_bytes.is_some(), + has_video = has_video, + has_audio = has_audio, + "storing init segment in catalog" + ); + + // Verify moov atom signature + let moov_offset = self.ftyp_bytes.as_ref().map(|f| f.len()).unwrap_or(0); + if moov_offset + 8 <= init_segment_bytes.len() { + let atom_type = String::from_utf8_lossy(&init_segment_bytes[moov_offset + 4..moov_offset + 8]); + tracing::info!(atom_type = %atom_type, "verifying moov atom signature in init segment"); + } + + // Warn if moov doesn't contain expected tracks. + // For HLS, inits are per-track (video-only or audio-only), so skip cross-track warnings. + let video_only = has_video && !has_audio; + let audio_only = has_audio && !has_video; + if expected_video_tracks > 0 && !has_video && !audio_only { + tracing::error!( + "moov atom does not contain video track but video configs exist! This will cause client-side errors." + ); + } + if expected_audio_tracks > 0 && !has_audio && !video_only { + tracing::error!( + "moov atom does not contain audio track but audio configs exist! This will cause client-side errors." + ); + } + + // Store init segment in catalog for the relevant track type + // For HLS, each track has its own init segment (video init segment only has video, + // audio init segment only has audio). For direct fMP4 files, the init segment + // contains all tracks. We store track-specific init segments only in the tracks + // created in this init call, not all renditions of that type. + + if has_video { + if let Some(video) = catalog.video.as_mut() { + for track_name in &created_video_tracks { + if let Some(config) = video.renditions.get_mut(track_name) { + config.init_segment = Some(init_segment_bytes.clone()); + tracing::debug!( + video_track = %track_name, + init_segment_size = init_segment_bytes.len(), + has_audio_track = has_audio, + "stored init segment in video config" + ); + } + } + } + } + + if has_audio { + if let Some(audio) = catalog.audio.as_mut() { + for track_name in &created_audio_tracks { + if let Some(config) = audio.renditions.get_mut(track_name) { + config.init_segment = Some(init_segment_bytes.clone()); + tracing::debug!( + audio_track = %track_name, + init_segment_size = init_segment_bytes.len(), + has_video_track = has_video, + "stored init segment in audio config" + ); + } + } + } + } + + // Init has been stored; clear cached moov/ftyp to avoid repeated warnings later. + self.moov_bytes = None; + self.ftyp_bytes = None; + } else { + tracing::warn!( + "passthrough mode enabled but moov_bytes is None - init segment will not be stored in catalog" + ); + } + } + Ok(()) } - fn init_video(trak: &Trak) -> anyhow::Result { + fn init_video_static(trak: &Trak, passthrough_mode: bool) -> anyhow::Result { let stsd = &trak.mdia.minf.stbl.stsd; let codec = match stsd.codecs.len() { @@ -199,10 +441,16 @@ impl Fmp4 { display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } - mp4_atom::Codec::Hev1(hev1) => Self::init_h265(true, &hev1.hvcc, &hev1.visual)?, - mp4_atom::Codec::Hvc1(hvc1) => Self::init_h265(false, &hvc1.hvcc, &hvc1.visual)?, + mp4_atom::Codec::Hev1(hev1) => Self::init_h265_static(true, &hev1.hvcc, &hev1.visual, passthrough_mode)?, + mp4_atom::Codec::Hvc1(hvc1) => Self::init_h265_static(false, &hvc1.hvcc, &hvc1.visual, passthrough_mode)?, mp4_atom::Codec::Vp08(vp08) => VideoConfig { codec: VideoCodec::VP8, description: Default::default(), @@ -214,6 +462,12 @@ impl Fmp4 { display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, }, mp4_atom::Codec::Vp09(vp09) => { // https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L238 @@ -240,6 +494,12 @@ impl Fmp4 { optimize_for_latency: None, bitrate: None, framerate: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } mp4_atom::Codec::Av01(av01) => { @@ -272,6 +532,12 @@ impl Fmp4 { optimize_for_latency: None, bitrate: None, framerate: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } mp4_atom::Codec::Unknown(unknown) => anyhow::bail!("unknown codec: {:?}", unknown), @@ -282,7 +548,12 @@ impl Fmp4 { } // There's two almost identical hvcc atoms in the wild. - fn init_h265(in_band: bool, hvcc: &mp4_atom::Hvcc, visual: &mp4_atom::Visual) -> anyhow::Result { + fn init_h265_static( + in_band: bool, + hvcc: &mp4_atom::Hvcc, + visual: &mp4_atom::Visual, + passthrough_mode: bool, + ) -> anyhow::Result { let mut description = BytesMut::new(); hvcc.encode_body(&mut description)?; @@ -302,14 +573,20 @@ impl Fmp4 { coded_height: Some(visual.height as _), // TODO: populate these fields bitrate: None, + init_segment: None, framerate: None, display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, }) } - fn init_audio(trak: &Trak) -> anyhow::Result { + fn init_audio_static(trak: &Trak, passthrough_mode: bool) -> anyhow::Result { let stsd = &trak.mdia.minf.stbl.stsd; let codec = match stsd.codecs.len() { @@ -338,6 +615,12 @@ impl Fmp4 { channel_count: mp4a.audio.channel_count as _, bitrate: Some(bitrate.into()), description: None, // TODO? + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } mp4_atom::Codec::Opus(opus) => { @@ -347,6 +630,12 @@ impl Fmp4 { channel_count: opus.audio.channel_count as _, bitrate: None, description: None, // TODO? + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } mp4_atom::Codec::Unknown(unknown) => anyhow::bail!("unknown codec: {:?}", unknown), @@ -491,6 +780,116 @@ impl Fmp4 { Ok(()) } + + // Transport a complete CMAF fragment (moof+mdat) directly without decomposing. + fn transport_fragment(&mut self, fragment: Bytes, moof: Moof) -> anyhow::Result<()> { + // Verify that init segment was sent before fragments + if self.moov_bytes.is_some() { + tracing::warn!("transporting fragment but moov_bytes is still set - init segment may not have been sent"); + } + + // Verify fragment starts with moof atom + if fragment.len() >= 8 { + let atom_type = String::from_utf8_lossy(&fragment[4..8]); + tracing::info!(atom_type = %atom_type, fragment_size = fragment.len(), passthrough_mode = self.passthrough_mode, "transporting fragment"); + } + + // Ensure moov is available (init segment must be processed first) + let moov = self.moov.as_ref().ok_or_else(|| { + anyhow::anyhow!("missing moov box - init segment must be processed before fragments. Make sure ensure_init_segment() is called first.") + })?; + + // Loop over all of the traf boxes in the moof. + for traf in &moof.traf { + let track_id = traf.tfhd.track_id; + let track = self.tracks.get_mut(&track_id).context("unknown track")?; + + // Find the track information in the moov + let trak = moov + .trak + .iter() + .find(|trak| trak.tkhd.track_id == track_id) + .context("unknown track")?; + + let tfdt = traf.tfdt.as_ref().context("missing tfdt box")?; + let dts = tfdt.base_media_decode_time; + let timescale = trak.mdia.mdhd.timescale as u64; + + // Convert timestamp from track timescale to microseconds + let micros = (dts as u128 * 1_000_000 / timescale as u128) as u64; + let timestamp = hang::Timestamp::from_micros(micros)?; + + // Determine keyframe status (reuse logic from extract()) + let is_keyframe = if trak.mdia.hdlr.handler == b"vide".into() { + // For video, check sample flags in trun entries + let mut is_keyframe = false; + if let Some(trun) = traf.trun.first() { + if let Some(entry) = trun.entries.first() { + let tfhd = &traf.tfhd; + let flags = entry.flags.unwrap_or(tfhd.default_sample_flags.unwrap_or_default()); + // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 + let keyframe_flag = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther + let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample + is_keyframe = keyframe_flag && !non_sync; + + if is_keyframe { + // Force an audio keyframe on video keyframes + for audio in moov.trak.iter().filter(|t| t.mdia.hdlr.handler == b"soun".into()) { + self.last_keyframe.remove(&audio.tkhd.track_id); + } + } + } + } + is_keyframe + } else { + // For audio, force keyframe every 10 seconds or at video keyframes + match self.last_keyframe.get(&track_id) { + Some(prev) => timestamp - *prev > Timestamp::from_secs(10).unwrap(), + None => true, + } + }; + + if is_keyframe { + self.last_keyframe.insert(track_id, timestamp); + } + + // In passthrough mode, send fragments directly without init segments + // Init segments are stored in the catalog and reconstructed on the client side + if self.passthrough_mode { + // The first frame must be a keyframe to create the initial group + // After that, we can send fragments based on their actual keyframe status + let is_first_frame = !self.first_frame_sent.get(&track_id).copied().unwrap_or(false); + let should_be_keyframe = is_first_frame || is_keyframe; + + if is_first_frame { + self.first_frame_sent.insert(track_id, true); + } + + let frame = hang::Frame { + timestamp, + keyframe: should_be_keyframe, + payload: fragment.clone().into(), + }; + track.write(frame)?; + if should_be_keyframe { + tracing::info!(track_id, timestamp = ?timestamp, fragment_size = fragment.len(), is_first = is_first_frame, "sent fragment in passthrough mode (keyframe - creates group)"); + } else { + tracing::debug!(track_id, timestamp = ?timestamp, fragment_size = fragment.len(), "sent non-keyframe fragment in passthrough mode"); + } + } else { + // For non-passthrough mode, just write the frame normally + let frame = hang::Frame { + timestamp, + keyframe: is_keyframe, + payload: fragment.clone().into(), + }; + track.write(frame)?; + tracing::info!(track_id, timestamp = ?timestamp, fragment_size = fragment.len(), is_keyframe = is_keyframe, "sent fragment (non-passthrough mode)"); + } + } + + Ok(()) + } } impl Drop for Fmp4 { diff --git a/rs/hang/src/import/hev1.rs b/rs/hang/src/import/hev1.rs index 57d3ff111..5f3372fa9 100644 --- a/rs/hang/src/import/hev1.rs +++ b/rs/hang/src/import/hev1.rs @@ -62,6 +62,8 @@ impl Hev1 { display_ratio_width: vui_data.display_ratio_width, display_ratio_height: vui_data.display_ratio_height, optimize_for_latency: None, + container: hang::catalog::Container::Native, + init_segment: None, }; if let Some(old) = &self.config { diff --git a/rs/hang/src/import/hls.rs b/rs/hang/src/import/hls.rs index 992822be4..53cdf453a 100644 --- a/rs/hang/src/import/hls.rs +++ b/rs/hang/src/import/hls.rs @@ -31,11 +31,20 @@ pub struct HlsConfig { /// An optional HTTP client to use for fetching the playlist and segments. /// If not provided, a default client will be created. pub client: Option, + + /// Enable passthrough mode for CMAF fragment transport. + /// When enabled, complete fMP4 fragments (moof+mdat) are transported directly + /// instead of being decomposed into individual samples. + pub passthrough: bool, } impl HlsConfig { pub fn new(playlist: String) -> Self { - Self { playlist, client: None } + Self { + playlist, + client: None, + passthrough: false, + } } /// Parse the playlist string into a URL. @@ -86,6 +95,8 @@ pub struct Hls { video: Vec, /// Optional audio track shared across variants. audio: Option, + /// Passthrough mode setting for fMP4 importers. + passthrough: bool, } #[derive(Debug, Clone, Copy)] @@ -120,9 +131,11 @@ impl Hls { .build() .unwrap() }); + let passthrough = cfg.passthrough; Ok(Self { broadcast, video_importers: Vec::new(), + passthrough, audio_importer: None, client, base_url, @@ -150,9 +163,10 @@ impl Hls { let outcome = self.step().await?; let delay = self.refresh_delay(outcome.target_duration, outcome.wrote_segments); - debug!( - wrote = outcome.wrote_segments, - delay = ?delay, + info!( + wrote_segments = outcome.wrote_segments, + target_duration = ?outcome.target_duration, + delay_secs = delay.as_secs_f32(), "HLS ingest step complete" ); @@ -165,6 +179,7 @@ impl Hls { self.ensure_tracks().await?; let mut buffered = 0usize; + const MAX_INIT_SEGMENTS: usize = 3; // Only process a few segments during init to avoid getting ahead of live stream // Prime all discovered video variants. // @@ -174,7 +189,7 @@ impl Hls { for (index, mut track) in video_tracks.into_iter().enumerate() { let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; let count = self - .consume_segments(TrackKind::Video(index), &mut track, &playlist) + .consume_segments_limited(TrackKind::Video(index), &mut track, &playlist, MAX_INIT_SEGMENTS) .await?; buffered += count; self.video.push(track); @@ -183,7 +198,9 @@ impl Hls { // Prime the shared audio track, if any. if let Some(mut track) = self.audio.take() { let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; - let count = self.consume_segments(TrackKind::Audio, &mut track, &playlist).await?; + let count = self + .consume_segments_limited(TrackKind::Audio, &mut track, &playlist, MAX_INIT_SEGMENTS) + .await?; buffered += count; self.audio = Some(track); } @@ -302,6 +319,50 @@ impl Hls { Ok(()) } + async fn consume_segments_limited( + &mut self, + kind: TrackKind, + track: &mut TrackState, + playlist: &MediaPlaylist, + max_segments: usize, + ) -> anyhow::Result { + // Calculate segments to process + let next_seq = track.next_sequence.unwrap_or(0); + let playlist_seq = playlist.media_sequence; + let total_segments = playlist.segments.len(); + + let last_playlist_seq = playlist_seq + total_segments as u64; + + let skip = if next_seq > last_playlist_seq { + total_segments + } else if next_seq < playlist_seq { + track.next_sequence = None; + 0 + } else { + (next_seq - playlist_seq) as usize + }; + + let available = total_segments.saturating_sub(skip); + + // Limit how many segments we process + let to_process = available.min(max_segments); + + if to_process > 0 { + let base_seq = playlist_seq + skip as u64; + for (i, segment) in playlist.segments[skip..skip + to_process].iter().enumerate() { + self.push_segment(kind, track, segment, base_seq + i as u64).await?; + } + info!( + ?kind, + processed = to_process, + available = available, + "processed limited segments during init" + ); + } + + Ok(to_process) + } + async fn consume_segments( &mut self, kind: TrackKind, @@ -310,19 +371,63 @@ impl Hls { ) -> anyhow::Result { self.ensure_init_segment(kind, track, playlist).await?; - // Skip segments we've already seen - let skip = track.next_sequence.unwrap_or(0).saturating_sub(playlist.media_sequence) as usize; - let base_seq = playlist.media_sequence + skip as u64; - for (i, segment) in playlist.segments[skip..].iter().enumerate() { - self.push_segment(kind, track, segment, base_seq + i as u64).await?; - } - let consumed = playlist.segments.len() - skip; + // Calculate how many segments to skip (already processed) + let next_seq = track.next_sequence.unwrap_or(0); + let playlist_seq = playlist.media_sequence; + let total_segments = playlist.segments.len(); + + // Calculate the last sequence number in the playlist + let last_playlist_seq = playlist_seq + total_segments as u64; + + // If we've already processed beyond what's in the playlist, wait for new segments + let skip = if next_seq > last_playlist_seq { + // We're ahead of the playlist - wait for ffmpeg to generate more segments + warn!( + ?kind, + next_sequence = next_seq, + playlist_sequence = playlist_seq, + last_playlist_sequence = last_playlist_seq, + "imported ahead of playlist, waiting for new segments" + ); + total_segments // Skip all segments in playlist + } else if next_seq < playlist_seq { + // We're behind - reset and start from the beginning of the playlist + warn!( + ?kind, + next_sequence = next_seq, + playlist_sequence = playlist_seq, + "next_sequence behind playlist, resetting to start of playlist" + ); + track.next_sequence = None; + 0 + } else { + // Normal case: next_seq is within playlist range + (next_seq - playlist_seq) as usize + }; + + let fresh_segments = total_segments.saturating_sub(skip); - if consumed == 0 { + info!( + ?kind, + playlist_sequence = playlist_seq, + next_sequence = next_seq, + skip = skip, + total_segments = total_segments, + fresh_segments = fresh_segments, + "consuming HLS segments" + ); + + if fresh_segments > 0 { + let base_seq = playlist_seq + skip as u64; + for (i, segment) in playlist.segments[skip..].iter().enumerate() { + self.push_segment(kind, track, segment, base_seq + i as u64).await?; + } + info!(?kind, consumed = fresh_segments, "consumed HLS segments"); + } else { debug!(?kind, "no fresh HLS segments available"); } - Ok(consumed) + Ok(fresh_segments) } async fn ensure_init_segment( @@ -369,11 +474,28 @@ impl Hls { let url = resolve_uri(&track.playlist, &segment.uri)?; let mut bytes = self.fetch_bytes(url).await?; + // Ensure the importer is initialized before processing fragments + // Use track.init_ready to avoid borrowing issues + if !track.init_ready { + // Try to ensure init segment is processed + let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; + self.ensure_init_segment(kind, track, &playlist).await?; + } + + // Get importer after ensuring init segment let importer = match kind { TrackKind::Video(index) => self.ensure_video_importer_for(index), TrackKind::Audio => self.ensure_audio_importer(), }; + // Final check after ensuring init segment + if !importer.is_initialized() { + return Err(anyhow::anyhow!( + "importer not initialized for {:?} after ensure_init_segment - init segment processing failed", + kind + )); + } + importer.decode(&mut bytes).context("failed to parse media segment")?; track.next_sequence = Some(sequence + 1); @@ -403,7 +525,8 @@ impl Hls { /// independent while still contributing to the same shared catalog. fn ensure_video_importer_for(&mut self, index: usize) -> &mut Fmp4 { while self.video_importers.len() <= index { - let importer = Fmp4::new(self.broadcast.clone()); + let mut importer = Fmp4::new(self.broadcast.clone()); + importer.set_passthrough_mode(self.passthrough); self.video_importers.push(importer); } @@ -412,8 +535,12 @@ impl Hls { /// Create or retrieve the fMP4 importer for the audio rendition. fn ensure_audio_importer(&mut self) -> &mut Fmp4 { - self.audio_importer - .get_or_insert_with(|| Fmp4::new(self.broadcast.clone())) + let importer = self.audio_importer.get_or_insert_with(|| { + let mut imp = Fmp4::new(self.broadcast.clone()); + imp.set_passthrough_mode(self.passthrough); + imp + }); + importer } #[cfg(test)] diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs index c6f822581..497a20cd6 100644 --- a/rs/hang/src/import/opus.rs +++ b/rs/hang/src/import/opus.rs @@ -53,6 +53,8 @@ impl Opus { channel_count, bitrate: None, description: None, + container: hang::catalog::Container::Native, + init_segment: None, }; tracing::debug!(name = ?track.name, ?config, "starting track"); diff --git a/rs/hang/src/model/track.rs b/rs/hang/src/model/track.rs index cd092571a..0819911c4 100644 --- a/rs/hang/src/model/track.rs +++ b/rs/hang/src/model/track.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::ops::Deref; +use crate::catalog::Container; use crate::model::{Frame, GroupConsumer, Timestamp}; use crate::Error; use futures::{stream::FuturesUnordered, StreamExt}; @@ -23,15 +24,21 @@ pub struct TrackProducer { pub inner: moq_lite::TrackProducer, group: Option, keyframe: Option, + /// Track if the current group is the init segment group (timestamp 0) + /// We keep this group open so new subscribers can receive the init segment + is_init_segment_group: bool, + container: Container, } impl TrackProducer { /// Create a new TrackProducer wrapping the given moq-lite producer. - pub fn new(inner: moq_lite::TrackProducer) -> Self { + pub fn new(inner: moq_lite::TrackProducer, container: Container) -> Self { Self { inner, group: None, keyframe: None, + is_init_segment_group: false, + container, } } @@ -48,11 +55,22 @@ impl TrackProducer { tracing::trace!(?frame, "write frame"); let mut header = BytesMut::new(); - frame.timestamp.as_micros().encode(&mut header, lite::Version::Draft02); + if self.container != Container::Cmaf { + frame.timestamp.as_micros().encode(&mut header, lite::Version::Draft02); + } if frame.keyframe { if let Some(group) = self.group.take() { - group.close(); + // Don't close the init segment group - keep it open for new subscribers + if self.is_init_segment_group { + tracing::debug!("keeping init segment group open for new subscribers"); + // Don't close it, just drop it (the group remains open) + drop(group); + } else { + tracing::info!(timestamp = ?frame.timestamp, "closing group and creating new one for keyframe"); + group.close(); + } + self.is_init_segment_group = false; } // Make sure this frame's timestamp doesn't go backwards relative to the last keyframe. @@ -68,7 +86,18 @@ impl TrackProducer { let mut group = match self.group.take() { Some(group) => group, - None if frame.keyframe => self.inner.append_group(), + None if frame.keyframe => { + let new_group = self.inner.append_group(); + // Log when creating a new group, especially for init segment (timestamp 0) + if frame.timestamp.as_micros() == 0 { + tracing::info!(timestamp = ?frame.timestamp, "creating new group for init segment (timestamp 0)"); + // Mark this as the init segment group so we can keep it open + self.is_init_segment_group = true; + } else { + tracing::info!(timestamp = ?frame.timestamp, "creating new group for keyframe"); + } + new_group + } // The first frame must be a keyframe. None => return Err(Error::MissingKeyframe), }; @@ -76,10 +105,14 @@ impl TrackProducer { let size = header.len() + frame.payload.remaining(); let mut chunked = group.create_frame(size.into()); - chunked.write_chunk(header.freeze()); + if !header.is_empty() { + chunked.write_chunk(header.freeze()); + } + for chunk in frame.payload { chunked.write_chunk(chunk); } + chunked.close(); self.group.replace(group); @@ -98,7 +131,7 @@ impl TrackProducer { impl From for TrackProducer { fn from(inner: moq_lite::TrackProducer) -> Self { - Self::new(inner) + Self::new(inner, Container::Native) } }