Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
53c71ab
WIP
kixelated Dec 19, 2025
df203bf
WIP
kixelated Dec 19, 2025
be0fbe4
More
kixelated Dec 19, 2025
2450f11
Too tired.
kixelated Dec 19, 2025
0f649f5
Merge remote-tracking branch 'origin/main' into max-latency-v2
kixelated Dec 22, 2025
8614708
Time of ur life.
kixelated Dec 23, 2025
660afa4
WQIP
kixelated Dec 24, 2025
51188ba
WIP
kixelated Dec 26, 2025
427a4cb
More API fixes.
kixelated Dec 26, 2025
8e342e5
Bug fixed.
kixelated Dec 27, 2025
f5d2917
k works better.
kixelated Dec 27, 2025
a8d6b89
MORE WIP
kixelated Dec 27, 2025
792b604
Simplify the proxy stuff.
kixelated Dec 27, 2025
837cdf1
just fix boye
kixelated Dec 27, 2025
3289953
Merge branch 'main' into max-latency-v2
kixelated Dec 27, 2025
ea7683a
Merge remote-tracking branch 'origin/main' into max-latency-v2
kixelated Dec 27, 2025
59fad2a
WIP
kixelated Dec 28, 2025
7592b9f
Another attempt, getting closer?
kixelated Dec 29, 2025
4b3a7cd
Ported but untested.
kixelated Dec 31, 2025
a5e5ae5
just fix
kixelated Dec 31, 2025
0482bc5
Revert the async method change.
kixelated Dec 31, 2025
ae50e5d
Catalog makes it at least.
kixelated Dec 31, 2025
2013415
Start removing the priority.
kixelated Dec 31, 2025
2bffb92
Make priority optional in the catalog.
kixelated Jan 1, 2026
f013158
Cancel groups on unsubscribe.
kixelated Jan 7, 2026
90ab53c
WIP back to milliseconds.
kixelated Jan 8, 2026
1760a38
Generic timescale, will use for hang.
kixelated Jan 10, 2026
04a2688
Seems gud.
kixelated Jan 10, 2026
8f13b0e
Merge remote-tracking branch 'origin/main' into max-latency-v3
kixelated Jan 10, 2026
eb6e899
just fix
kixelated Jan 10, 2026
1608678
Pre-PR
kixelated Jan 10, 2026
1ec042d
Merge remote-tracking branch 'origin/main' into max-latency-v3
kixelated Jan 10, 2026
69f8dde
Pre-PR review.
kixelated Jan 10, 2026
2fd098a
PR review
kixelated Jan 11, 2026
02d5622
Add back the tests, even though they are failing.
kixelated Jan 11, 2026
c7ce556
Merge remote-tracking branch 'origin/main' into max-latency-v3
kixelated Jan 12, 2026
c621a28
Fix the remaining unit tests.
kixelated Jan 13, 2026
ae2d640
Some AI generated tests.
kixelated Jan 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 159 additions & 122 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 13 additions & 11 deletions js/clock/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ async function publish(config: Config) {

// Wait until we get a subscription for the track
for (;;) {
const request = await broadcast.requested();
if (!request) break;
const track = await broadcast.requested();
if (!track) break;

if (request.track.name === config.track) {
publishTrack(request.track);
if (track.name === config.track) {
publishTrack(track);
} else {
request.track.close(new Error("not found"));
track.close(new Error("not found"));
}
}
}
Expand All @@ -103,7 +103,8 @@ async function publishTrack(track: Moq.Track) {

// Send the base timestamp (everything but seconds) - matching Rust format
const base = `${now.toISOString().slice(0, 16).replace("T", " ")}:`;
group.writeString(base);
const frame = Moq.Frame.fromString(base);
group.writeFrame(frame);

// Send individual seconds for this minute
const currentMinute = now.getMinutes();
Expand All @@ -112,7 +113,8 @@ async function publishTrack(track: Moq.Track) {
const secondsNow = new Date();
const seconds = secondsNow.getSeconds().toString().padStart(2, "0");

group.writeString(seconds);
const frame = Moq.Frame.fromString(seconds);
group.writeFrame(frame);

// Wait until next second
const nextSecond = new Date(secondsNow);
Expand All @@ -138,7 +140,7 @@ async function subscribe(config: Config) {
console.log("✅ Connected to relay:", config.url);

const broadcast = connection.consume(Moq.Path.from(config.broadcast));
const track = broadcast.subscribe(config.track, 0);
const track = broadcast.subscribe({ name: config.track });

console.log("✅ Subscribed to track:", config.track);

Expand All @@ -157,16 +159,16 @@ async function subscribe(config: Config) {
continue;
}

const base = new TextDecoder().decode(baseFrame);
const base = baseFrame.toString();

// Read individual second frames
for (;;) {
const frame = await group.readString();
const frame = await group.readFrame();
if (!frame) {
break; // End of group
}

const seconds = parseInt(frame, 10);
const seconds = parseInt(frame.toString(), 10);

// Clock emoji positions
const clockEmojis = ["🕛", "🕐", "🕑", "🕒", "🕓", "🕔", "🕕", "🕖", "🕗", "🕘", "🕙", "🕚"];
Expand Down
12 changes: 4 additions & 8 deletions js/hang/src/catalog/audio.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import { z } from "zod";
import { ContainerSchema, DEFAULT_CONTAINER } from "./container";
import { u53Schema } from "./integers";

// Backwards compatibility: old track schema
const TrackSchema = z.object({
name: z.string(),
priority: z.number().int().min(0).max(255),
});
import { TrackSchema } from "./track";

// Mirrors AudioDecoderConfig
// https://w3c.github.io/webcodecs/#audio-decoder-config
Expand Down Expand Up @@ -40,8 +35,9 @@ export const AudioSchema = z
// This is not an array so it will work with JSON Merge Patch.
renditions: z.record(z.string(), AudioConfigSchema),

// The priority of the audio track, relative to other tracks in the broadcast.
priority: z.number().int().min(0).max(255),
// DEPRECATED: The priority of the audio track, relative to other tracks in the broadcast.
// The subscriber is expected to choose its own priority, instead of being told.
priority: z.number().int().min(0).max(255).default(0),
})
.or(
// Backwards compatibility: transform old {track, config} format to new object format
Expand Down
2 changes: 1 addition & 1 deletion js/hang/src/catalog/root.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ export function decode(raw: Uint8Array): Root {
export async function fetch(track: Moq.Track): Promise<Root | undefined> {
const frame = await track.readFrame();
if (!frame) return undefined;
return decode(frame);
return decode(frame.payload);
}
4 changes: 3 additions & 1 deletion js/hang/src/catalog/track.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { z } from "zod";

export const TrackSchema = z.object({
name: z.string(),
priority: z.number().int().min(0).max(255),
// DEPRECATED: The priority of the track, relative to other tracks in the broadcast.
// The subscriber is supposed to choose its own priority, instead of being told.
priority: z.number().int().min(0).max(255).default(0),
});
export type Track = z.infer<typeof TrackSchema>;
10 changes: 3 additions & 7 deletions js/hang/src/catalog/video.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import { z } from "zod";
import { ContainerSchema, DEFAULT_CONTAINER } from "./container";
import { u53Schema } from "./integers";

// Backwards compatibility: old track schema
const TrackSchema = z.object({
name: z.string(),
priority: z.number().int().min(0).max(255),
});
import { TrackSchema } from "./track";

// Based on VideoDecoderConfig
export const VideoConfigSchema = z.object({
Expand Down Expand Up @@ -54,7 +49,8 @@ export const VideoSchema = z
renditions: z.record(z.string(), VideoConfigSchema),

// The priority of the video track, relative to other tracks in the broadcast.
priority: z.number().int().min(0).max(255),
// TODO: Remove this; it's for backwards compatibility only
priority: z.number().int().min(0).max(255).default(0),

// Render the video at this size in pixels.
// This is separate from the display aspect ratio because it does not require reinitialization.
Expand Down
17 changes: 11 additions & 6 deletions js/hang/src/frame.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type * as Moq from "@moq/lite";
import * as Moq from "@moq/lite";
import { Time } from "@moq/lite";
import { Effect, Signal } from "@moq/signals";
import type * as Catalog from "./catalog";
Expand All @@ -16,8 +16,9 @@ export interface Frame {
group: number;
}

export function encode(source: Uint8Array | Source, timestamp: Time.Micro, container?: Catalog.Container): Uint8Array {
export function encode(source: Uint8Array | Source, timestamp: Time.Micro, container?: Catalog.Container): Moq.Frame {
// Encode timestamp using the specified container format
// TODO This should be delta encoded, not the full timestamp.
const timestampBytes = Container.encodeTimestamp(timestamp, container);

// Allocate buffer for timestamp + payload
Expand All @@ -34,14 +35,18 @@ export function encode(source: Uint8Array | Source, timestamp: Time.Micro, conta
source.copyTo(data.subarray(timestampBytes.byteLength));
}

return data;
// NOTE: We encode the timestamp into the MoQ layer as well, but in milliseconds.
// TODO: Once this is widespread enough, we should use it at least as the base.
return new Moq.Frame({ payload: data, instant: Time.Milli.fromMicro(timestamp) });
}

// NOTE: A keyframe is always the first frame in a group, so it's not encoded on the wire.
export function decode(buffer: Uint8Array, container?: Catalog.Container): { data: Uint8Array; timestamp: Time.Micro } {
export function decode(frame: Moq.Frame, container?: Catalog.Container): { data: Uint8Array; timestamp: Time.Micro } {
// Decode timestamp using the specified container format
const [timestamp, data] = Container.decodeTimestamp(buffer, container);
return { timestamp: timestamp as Time.Micro, data };
// TODO This should be delta encoded, not the full timestamp.
// TODO: Use frame.instant to avoid double encoding the timestamp.
const [timestamp, data] = Container.decodeTimestamp(frame.payload, container);
return { timestamp, data };
}

export class Producer {
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion js/hang/src/publish/audio/encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import type * as Catalog from "../../catalog";
import { DEFAULT_CONTAINER } from "../../catalog";
import { u53 } from "../../catalog/integers";
import * as Frame from "../../frame";
import { PRIORITY } from "../../priority";
import * as libav from "../../util/libav";
import { PRIORITY } from "../priority";
import type * as Capture from "./capture";
import type { Source } from "./types";

Expand Down
39 changes: 19 additions & 20 deletions js/hang/src/publish/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,45 +72,43 @@ export class Broadcast {

async #runBroadcast(broadcast: Moq.Broadcast, effect: Effect) {
for (;;) {
const request = await broadcast.requested();
if (!request) break;
const track = await broadcast.requested();
if (!track) break;

effect.cleanup(() => request.track.close());
effect.cleanup(() => track.close());

effect.effect((effect) => {
if (effect.get(request.track.state.closed)) return;

switch (request.track.name) {
switch (track.name) {
case Broadcast.CATALOG_TRACK:
this.#serveCatalog(request.track, effect);
this.#serveCatalog(track, effect);
break;
case Location.Window.TRACK:
this.location.window.serve(request.track, effect);
this.location.window.serve(track, effect);
break;
case Location.Peers.TRACK:
this.location.peers.serve(request.track, effect);
this.location.peers.serve(track, effect);
break;
case Preview.TRACK:
this.preview.serve(request.track, effect);
this.preview.serve(track, effect);
break;
case Chat.Typing.TRACK:
this.chat.typing.serve(request.track, effect);
this.chat.typing.serve(track, effect);
break;
case Chat.Message.TRACK:
this.chat.message.serve(request.track, effect);
this.chat.message.serve(track, effect);
break;
case Audio.Encoder.TRACK:
this.audio.serve(request.track, effect);
this.audio.serve(track, effect);
break;
case Video.Root.TRACK_HD:
this.video.hd.serve(request.track, effect);
this.video.hd.serve(track, effect);
break;
case Video.Root.TRACK_SD:
this.video.sd.serve(request.track, effect);
this.video.sd.serve(track, effect);
break;
default:
console.error("received subscription for unknown track", request.track.name);
request.track.close(new Error(`Unknown track: ${request.track.name}`));
console.error("received subscription for unknown track", track.name);
track.close(new Error(`Unknown track: ${track.name}`));
break;
}
});
Expand All @@ -120,7 +118,8 @@ export class Broadcast {
#serveCatalog(track: Moq.Track, effect: Effect): void {
if (!effect.get(this.enabled)) {
// Clear the catalog.
track.writeFrame(Catalog.encode({}));
const frame = new Moq.Frame({ payload: Catalog.encode({}) });
track.writeFrame(frame);
return;
}

Expand All @@ -134,8 +133,8 @@ export class Broadcast {
preview: effect.get(this.preview.catalog),
};

const encoded = Catalog.encode(catalog);
track.writeFrame(encoded);
const frame = new Moq.Frame({ payload: Catalog.encode(catalog) });
track.writeFrame(frame);
}

close() {
Expand Down
7 changes: 4 additions & 3 deletions js/hang/src/publish/chat/message.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type * as Moq from "@moq/lite";
import * as Moq from "@moq/lite";
import { Effect, Signal } from "@moq/signals";
import type * as Catalog from "../../catalog";
import { PRIORITY } from "../priority";
import { PRIORITY } from "../../priority";

export type MessageProps = {
enabled?: boolean | Signal<boolean>;
Expand Down Expand Up @@ -37,7 +37,8 @@ export class Message {
if (!enabled) return;

const latest = effect.get(this.latest);
track.writeString(latest ?? "");
const frame = Moq.Frame.fromString(latest ?? "");
track.writeFrame(frame);
}

close() {
Expand Down
7 changes: 4 additions & 3 deletions js/hang/src/publish/chat/typing.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type * as Moq from "@moq/lite";
import * as Moq from "@moq/lite";
import { Effect, Signal } from "@moq/signals";
import type * as Catalog from "../../catalog";
import { PRIORITY } from "../priority";
import { PRIORITY } from "../../priority";

export type TypingProps = {
enabled?: boolean | Signal<boolean>;
Expand Down Expand Up @@ -37,7 +37,8 @@ export class Typing {
if (!enabled) return;

const active = effect.get(this.active);
track.writeBool(active);
const frame = Moq.Frame.fromBool(active);
track.writeFrame(frame);
}

close() {
Expand Down
2 changes: 1 addition & 1 deletion js/hang/src/publish/location/peers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type * as Moq from "@moq/lite";
import * as Zod from "@moq/lite/zod";
import { Effect, Signal } from "@moq/signals";
import * as Catalog from "../../catalog";
import { PRIORITY } from "../priority";
import { PRIORITY } from "../../priority";

export interface PeersProps {
enabled?: boolean | Signal<boolean>;
Expand Down
2 changes: 1 addition & 1 deletion js/hang/src/publish/location/window.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type * as Moq from "@moq/lite";
import * as Zod from "@moq/lite/zod";
import { Effect, Signal } from "@moq/signals";
import * as Catalog from "../../catalog";
import { PRIORITY } from "../priority";
import { PRIORITY } from "../../priority";

export type WindowProps = {
// If true, then we'll publish our position to the broadcast.
Expand Down
7 changes: 4 additions & 3 deletions js/hang/src/publish/preview.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type * as Moq from "@moq/lite";
import * as Moq from "@moq/lite";
import { Effect, Signal } from "@moq/signals";
import type * as Catalog from "../catalog";
import { PRIORITY } from "./priority";
import { PRIORITY } from "../priority";

export type PreviewProps = {
enabled?: boolean | Signal<boolean>;
Expand Down Expand Up @@ -35,7 +35,8 @@ export class Preview {
const info = effect.get(this.info);
if (!info) return;

track.writeJson(info);
const frame = Moq.Frame.fromJson(info);
track.writeFrame(frame);
}

close() {
Expand Down
2 changes: 1 addition & 1 deletion js/hang/src/publish/video/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Effect, Signal } from "@moq/signals";
import * as Catalog from "../../catalog";
import { PRIORITY } from "../priority";
import { PRIORITY } from "../../priority";
import { Encoder, type EncoderProps } from "./encoder";
import { TrackProcessor } from "./polyfill";
import type { Source } from "./types";
Expand Down
3 changes: 2 additions & 1 deletion js/hang/src/watch/audio/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Time } from "@moq/lite";
import { Effect, type Getter, Signal } from "@moq/signals";
import type * as Catalog from "../../catalog";
import * as Frame from "../../frame";
import { PRIORITY } from "../../priority";
import * as Hex from "../../util/hex";
import * as libav from "../../util/libav";
import type * as Render from "./render";
Expand Down Expand Up @@ -163,7 +164,7 @@ export class Source {
const active = effect.get(this.active);
if (!active) return;

const sub = broadcast.subscribe(active, catalog.priority);
const sub = broadcast.subscribe({ name: active, priority: PRIORITY.audio, maxLatency: this.latency });
effect.cleanup(() => sub.close());

// Create consumer with slightly less latency than the render worklet to avoid underflowing.
Expand Down
5 changes: 2 additions & 3 deletions js/hang/src/watch/broadcast.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type * as Moq from "@moq/lite";
import { Effect, type Getter, Signal } from "@moq/signals";
import * as Catalog from "../catalog";
import { PRIORITY } from "../publish/priority";
import { PRIORITY } from "../priority";
import * as Audio from "./audio";
import { Chat, type ChatProps } from "./chat";
import * as Location from "./location";
Expand Down Expand Up @@ -101,7 +101,6 @@ export class Broadcast {

// Require full equality
if (update.path !== path) {
console.warn("ignoring announce", update.path);
continue;
}

Expand Down Expand Up @@ -131,7 +130,7 @@ export class Broadcast {

this.status.set("loading");

const catalog = broadcast.subscribe("catalog.json", PRIORITY.catalog);
const catalog = broadcast.subscribe({ name: "catalog.json", priority: PRIORITY.catalog });
effect.cleanup(() => catalog.close());

effect.spawn(this.#fetchCatalog.bind(this, catalog));
Expand Down
Loading
Loading