diff --git a/Cargo.lock b/Cargo.lock index 736930f39..90dce7215 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,9 +398,9 @@ checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" [[package]] name = "base16ct" -version = "1.0.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd307490d624467aa6f74b0eabb77633d1f758a7b25f12bceb0b22e08d9726f6" +checksum = "d8b59d472eab27ade8d770dcb11da7201c11234bef9f82ce7aa517be028d462b" [[package]] name = "base32" @@ -422,9 +422,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.8.2" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d809780667f4410e7c41b07f52439b94d2bdf8528eeedc287fa38d3b7f95d82" +checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" [[package]] name = "bitflags" @@ -434,16 +434,15 @@ checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" [[package]] name = "blake3" -version = "1.8.3" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2468ef7d57b3fb7e16b576e8377cdbde2320c60e1491e961d11da40fc4f02a2d" +checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0" dependencies = [ "arrayref", "arrayvec", "cc", "cfg-if", "constant_time_eq", - "cpufeatures", ] [[package]] @@ -512,7 +511,7 @@ checksum = "befbfd072a8e81c02f8c507aefce431fe5e7d051f83d48a23ffc9b9fe5a11799" dependencies = [ "clap", "heck", - "indexmap 2.13.0", + "indexmap 2.12.1", "log", "proc-macro2", "quote", @@ -525,9 +524,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.52" +version = "1.2.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" +checksum = "7a0aeaff4ff1a90589618835a598e545176939b97874f7abc7851caa0618f203" dependencies = [ "find-msvc-tools", "jobserver", @@ -593,9 +592,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.54" +version = "4.5.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6e6ff9dcd79cff5cd969a17a545d79e84ab086e444102a591e288a8aa3ce394" +checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8" dependencies = [ "clap_builder", "clap_derive", @@ -603,9 +602,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.54" +version = "4.5.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa42cf4d2b7a41bc8f663a7cab4031ebafa1bf3875705bfaf8466dc60ab52c00" +checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00" dependencies = [ "anstream", "anstyle", @@ -739,15 +738,15 @@ checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" [[package]] name = "const-oid" -version = "0.10.2" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" +checksum = "0dabb6555f92fb9ee4140454eb5dcd14c7960e1225c6d1a6cc361f032947713e" [[package]] name = "constant_time_eq" -version = "0.4.2" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d52eff69cd5e647efe296129160853a42795992097e8af39800e1060caeea9b" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" [[package]] name = "convert_case" @@ -993,7 +992,7 @@ version = "0.8.0-rc.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02c1d73e9668ea6b6a28172aa55f3ebec38507131ce179051c8033b5c6037653" dependencies = [ - "const-oid 0.10.2", + "const-oid 0.10.1", "pem-rfc7468 1.0.0", "zeroize", ] @@ -1022,13 +1021,34 @@ dependencies = [ "serde_core", ] +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl 1.0.0", +] + [[package]] name = "derive_more" version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134" dependencies = [ - "derive_more-impl", + "derive_more-impl 2.1.1", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", ] [[package]] @@ -1070,7 +1090,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac89f8a64533a9b0eaa73a68e424db0fb1fd6271c74cc0125336a05f090568d" dependencies = [ "block-buffer 0.11.0", - "const-oid 0.10.2", + "const-oid 0.10.1", "crypto-common 0.2.0-rc.4", ] @@ -1139,7 +1159,7 @@ checksum = "594435fe09e345ee388e4e8422072ff7dfeca8729389fbd997b3f5504c44cd47" dependencies = [ "pkcs8 0.11.0-rc.8", "serde", - "signature 3.0.0-rc.6", + "signature 3.0.0-rc.5", ] [[package]] @@ -1153,7 +1173,7 @@ dependencies = [ "rand_core 0.9.3", "serde", "sha2 0.11.0-rc.2", - "signature 3.0.0-rc.6", + "signature 3.0.0-rc.5", "subtle", "zeroize", ] @@ -1282,9 +1302,9 @@ checksum = "64cd1e32ddd350061ae6edb1b082d7c54915b5c672c389143b9a63403a109f24" [[package]] name = "find-msvc-tools" -version = "0.1.7" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" +checksum = "645cbb3a84e60b7531617d5ae4e57f7e27308f6445f5abf653209ea76dec8dff" [[package]] name = "fixedbitset" @@ -1538,9 +1558,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.13" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" dependencies = [ "atomic-waker", "bytes", @@ -1548,7 +1568,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.13.0", + "indexmap 2.12.1", "slab", "tokio", "tokio-util", @@ -1557,9 +1577,9 @@ dependencies = [ [[package]] name = "h264-parser" -version = "0.4.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "253b313319f7109de64e480ffb606f89475cd758bae82e096e00c5d95341d30e" +checksum = "ab2f85be813ce08f0569fcd816f256c6f4287c975d69a9a14ceacc309f1de967" [[package]] name = "hang" @@ -1568,8 +1588,7 @@ dependencies = [ "anyhow", "buf-list", "bytes", - "derive_more", - "futures", + "derive_more 2.1.1", "h264-parser", "hex", "lazy_static", @@ -2065,9 +2084,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.13.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", "hashbrown 0.16.1", @@ -2077,9 +2096,9 @@ dependencies = [ [[package]] name = "inout" -version = "0.2.2" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4250ce6452e92010fdf7268ccc5d14faa80bb12fc741938534c58f16804e03c7" +checksum = "c7357b6e7aa75618c7864ebd0634b115a7218b0615f4cb1df33ac3eca23943d4" dependencies = [ "hybrid-array", ] @@ -2116,9 +2135,9 @@ checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" [[package]] name = "iri-string" -version = "0.7.10" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" dependencies = [ "memchr", "serde", @@ -2136,7 +2155,7 @@ dependencies = [ "cfg_aliases", "crypto_box", "data-encoding", - "derive_more", + "derive_more 2.1.1", "ed25519-dalek", "futures-util", "getrandom 0.3.4", @@ -2187,7 +2206,7 @@ checksum = "25a8c5fb1cc65589f0d7ab44269a76f615a8c4458356952c9b0ef1c93ea45ff8" dependencies = [ "curve25519-dalek", "data-encoding", - "derive_more", + "derive_more 2.1.1", "ed25519-dalek", "n0-error", "rand_core 0.9.3", @@ -2289,7 +2308,7 @@ dependencies = [ "bytes", "cfg_aliases", "data-encoding", - "derive_more", + "derive_more 2.1.1", "getrandom 0.3.4", "hickory-resolver", "http", @@ -2300,7 +2319,7 @@ dependencies = [ "iroh-metrics", "iroh-quinn", "iroh-quinn-proto", - "lru", + "lru 0.16.2", "n0-error", "n0-future", "num_enum", @@ -2417,9 +2436,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.180" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libm" @@ -2491,9 +2510,15 @@ dependencies = [ [[package]] name = "lru" -version = "0.16.3" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465" + +[[package]] +name = "lru" +version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593" +checksum = "96051b46fc183dc9cd4a223960ef37b9af631b55191852a8274bfef064cda20f" dependencies = [ "hashbrown 0.16.1", ] @@ -2580,9 +2605,9 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.12" +version = "0.12.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3dec6bd31b08944e08b58fd99373893a6c17054d6f3ea5006cc894f4f4eee2a" +checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077" dependencies = [ "crossbeam-channel", "crossbeam-epoch", @@ -2590,6 +2615,7 @@ dependencies = [ "equivalent", "parking_lot", "portable-atomic", + "rustc_version", "smallvec", "tagptr", "uuid", @@ -2618,6 +2644,7 @@ dependencies = [ "futures", "hex", "num_enum", + "priority-queue", "rand 0.9.2", "serde", "thiserror 2.0.17", @@ -2722,7 +2749,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58b9fcf396d53fdf1c43a9afd38953412b9d782d11391807b473927317bb28f9" dependencies = [ "bytes", - "derive_more", + "derive_more 2.1.1", "num", "paste", "serde", @@ -2754,12 +2781,12 @@ dependencies = [ [[package]] name = "n0-future" -version = "0.3.2" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2ab99dfb861450e68853d34ae665243a88b8c493d01ba957321a1e9b2312bbe" +checksum = "8c0709ac8235ce13b82bc4d180ee3c42364b90c1a8a628c3422d991d75a728b5" dependencies = [ "cfg_aliases", - "derive_more", + "derive_more 1.0.0", "futures-buffered", "futures-lite", "futures-util", @@ -2779,7 +2806,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38acf13c1ddafc60eb7316d52213467f8ccb70b6f02b65e7d97f7799b1f50be4" dependencies = [ - "derive_more", + "derive_more 2.1.1", "n0-error", "n0-future", ] @@ -2858,7 +2885,7 @@ dependencies = [ "atomic-waker", "bytes", "cfg_aliases", - "derive_more", + "derive_more 2.1.1", "iroh-quinn-udp", "js-sys", "libc", @@ -3080,9 +3107,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl-probe" -version = "0.2.0" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "p256" @@ -3186,7 +3213,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.13.0", + "indexmap 2.12.1", ] [[package]] @@ -3233,9 +3260,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkarr" -version = "5.0.2" +version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d346b545765a0ef58b6a7e160e17ddaa7427f439b7b9a287df6c88c9e04bf2" +checksum = "792c1328860f6874e90e3b387b4929819cc7783a6bd5a4728e918706eb436a48" dependencies = [ "async-compat", "base32", @@ -3248,7 +3275,7 @@ dependencies = [ "futures-lite", "getrandom 0.3.4", "log", - "lru", + "lru 0.13.0", "ntimestamp", "reqwest", "self_cell", @@ -3305,9 +3332,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.13.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f89776e4d69bb58bc6993e99ffa1d11f228b839984854c7daeb5d37f87cbe950" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" [[package]] name = "portmapper" @@ -3317,7 +3344,7 @@ checksum = "7b575f975dcf03e258b0c7ab3f81497d7124f508884c37da66a7314aa2a8d467" dependencies = [ "base64 0.22.1", "bytes", - "derive_more", + "derive_more 2.1.1", "futures-lite", "futures-util", "hyper-util", @@ -3397,6 +3424,17 @@ dependencies = [ "elliptic-curve", ] +[[package]] +name = "priority-queue" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93980406f12d9f8140ed5abe7155acb10bb1e69ea55c88960b9c2f117445ef96" +dependencies = [ + "equivalent", + "indexmap 2.12.1", + "serde", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -3408,9 +3446,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.105" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" dependencies = [ "unicode-ident", ] @@ -3507,9 +3545,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.43" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -3787,9 +3825,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.36" +version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "aws-lc-rs", "log", @@ -3803,9 +3841,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +checksum = "9980d917ebb0c0536119ba501e90834767bffc3d60641457fd84a1f3fd337923" dependencies = [ "openssl-probe", "rustls-pki-types", @@ -3870,7 +3908,7 @@ dependencies = [ "rustls-webpki", "security-framework", "security-framework-sys", - "webpki-root-certs 1.0.5", + "webpki-root-certs 1.0.4", "windows-sys 0.61.2", ] @@ -4059,9 +4097,9 @@ dependencies = [ [[package]] name = "self_cell" -version = "1.2.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" +checksum = "16c2f82143577edb4921b71ede051dac62ca3c16084e918bf7b40c96ae10eb33" [[package]] name = "semver" @@ -4117,9 +4155,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da" dependencies = [ "itoa", "memchr", @@ -4170,7 +4208,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.13.0", + "indexmap 2.12.1", "schemars 0.9.0", "schemars 1.2.0", "serde_core", @@ -4193,11 +4231,11 @@ dependencies = [ [[package]] name = "serdect" -version = "0.4.2" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9af4a3e75ebd5599b30d4de5768e00b5095d518a79fefc3ecbaf77e665d1ec06" +checksum = "d3ef0e35b322ddfaecbc60f34ab448e157e48531288ee49fafbb053696b8ffe2" dependencies = [ - "base16ct 1.0.0", + "base16ct 0.3.0", "serde", ] @@ -4288,9 +4326,9 @@ dependencies = [ [[package]] name = "signature" -version = "3.0.0-rc.6" +version = "3.0.0-rc.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a96996ccff7dfa16f052bd995b4cecc72af22c35138738dc029f0ead6608d" +checksum = "2a0251c9d6468f4ba853b6352b190fb7c1e405087779917c238445eb03993826" [[package]] name = "simd-adler32" @@ -4450,9 +4488,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.114" +version = "2.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a" +checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87" dependencies = [ "proc-macro2", "quote", @@ -4627,9 +4665,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.49.0" +version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ "bytes", "libc", @@ -4666,9 +4704,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.18" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" dependencies = [ "futures-core", "pin-project-lite", @@ -4702,9 +4740,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.18" +version = "0.7.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", @@ -4738,11 +4776,11 @@ dependencies = [ [[package]] name = "toml" -version = "0.9.11+spec-1.1.0" +version = "0.9.10+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3afc9a848309fe1aaffaed6e1546a7a14de1f935dc9d89d32afd9a44bab7c46" +checksum = "0825052159284a1a8b4d6c0c86cbc801f2da5afd2b225fa548c72f2e74002f48" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.12.1", "serde_core", "serde_spanned", "toml_datetime", @@ -4766,7 +4804,7 @@ version = "0.23.10+spec-1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.12.1", "toml_datetime", "toml_parser", "winnow", @@ -4835,7 +4873,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", - "indexmap 2.13.0", + "indexmap 2.12.1", "pin-project-lite", "slab", "sync_wrapper", @@ -4998,9 +5036,9 @@ checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" [[package]] name = "unicase" -version = "2.9.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" [[package]] name = "unicode-ident" @@ -5044,15 +5082,14 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.8" +version = "2.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" dependencies = [ "form_urlencoded", "idna", "percent-encoding", "serde", - "serde_derive", ] [[package]] @@ -5254,9 +5291,9 @@ dependencies = [ [[package]] name = "web-transport-proto" -version = "0.3.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "660175a6d1643adb93b71c4f853d4f20f0fce47f53ae579afe9f7711fe84870d" +checksum = "4b5400535d6dd4c07dc86e83651a838fd513de7f5011d4e4eafa239fa4d0ded4" dependencies = [ "bytes", "http", @@ -5267,19 +5304,19 @@ dependencies = [ [[package]] name = "web-transport-quinn" -version = "0.10.2" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f44b4e68a3e7adb790793e24ec8b5923a610a8c2df1d6cd58849f9e4759d04" +checksum = "91815d3170c715230c94b5107a71ccf81646513e548ee1408c3ce285d021d6ca" dependencies = [ "bytes", "futures", "http", + "log", "quinn", "rustls", "rustls-native-certs", "thiserror 2.0.17", "tokio", - "tracing", "url", "web-transport-proto", "web-transport-trait", @@ -5287,18 +5324,18 @@ dependencies = [ [[package]] name = "web-transport-trait" -version = "0.3.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ae5c857e6b426610648b39c6b48f9e66ae97b27b166d7c2f1ec369596548271" +checksum = "8f4bafa8c6ff708042f67ef8031ca0f342822fd785b70f36a4b2c014760fc442" dependencies = [ "bytes", ] [[package]] name = "web-transport-ws" -version = "0.2.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cbd8d2d2a57b83ba70523e6a8df268a5bfe9f43aabba2d01b16389181f4a8ea" +checksum = "7690a94b7f9e843f6b07be25a03bdec6356b909840b52961c249c0bd75df6564" dependencies = [ "bytes", "futures", @@ -5315,23 +5352,23 @@ version = "0.26.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75c7f0ef91146ebfb530314f5f1d24528d7f0767efbfd31dce919275413e393e" dependencies = [ - "webpki-root-certs 1.0.5", + "webpki-root-certs 1.0.4", ] [[package]] name = "webpki-root-certs" -version = "1.0.5" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36a29fc0408b113f68cf32637857ab740edfafdf460c326cd2afaa2d84cc05dc" +checksum = "ee3e3b5f5e80bc89f30ce8d0343bf4e5f12341c51f3e26cbeecbc7c85443e85b" dependencies = [ "rustls-pki-types", ] [[package]] name = "webpki-roots" -version = "1.0.5" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12bed680863276c63889429bfd6cab3b99943659923822de1c8a39c49e4d722c" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" dependencies = [ "rustls-pki-types", ] @@ -5919,18 +5956,18 @@ checksum = "2164e798d9e3d84ee2c91139ace54638059a3b23e361f5c11781c2c6459bde0f" [[package]] name = "zerocopy" -version = "0.8.33" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.33" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" dependencies = [ "proc-macro2", "quote", @@ -5969,9 +6006,9 @@ dependencies = [ [[package]] name = "zeroize_derive" -version = "1.4.3" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", @@ -6013,6 +6050,6 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.12" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fc5a66a20078bf1251bde995aa2fdcc4b800c70b5d92dd2c62abc5c60f679f8" +checksum = "e6d6085d62852e35540689d1f97ad663e3971fc19cf5eceab364d62c646ea167" diff --git a/js/clock/src/main.ts b/js/clock/src/main.ts index e13db2af5..468ec7629 100755 --- a/js/clock/src/main.ts +++ b/js/clock/src/main.ts @@ -80,13 +80,13 @@ async function publish(config: Config) { // Wait until we get a subscription for the track for (;;) { - const request = await broadcast.requested(); - if (!request) break; + const track = await broadcast.requested(); + if (!track) break; - if (request.track.name === config.track) { - publishTrack(request.track); + if (track.name === config.track) { + publishTrack(track); } else { - request.track.close(new Error("not found")); + track.close(new Error("not found")); } } } @@ -103,7 +103,8 @@ async function publishTrack(track: Moq.Track) { // Send the base timestamp (everything but seconds) - matching Rust format const base = `${now.toISOString().slice(0, 16).replace("T", " ")}:`; - group.writeString(base); + const frame = Moq.Frame.fromString(base); + group.writeFrame(frame); // Send individual seconds for this minute const currentMinute = now.getMinutes(); @@ -112,7 +113,8 @@ async function publishTrack(track: Moq.Track) { const secondsNow = new Date(); const seconds = secondsNow.getSeconds().toString().padStart(2, "0"); - group.writeString(seconds); + const frame = Moq.Frame.fromString(seconds); + group.writeFrame(frame); // Wait until next second const nextSecond = new Date(secondsNow); @@ -138,7 +140,7 @@ async function subscribe(config: Config) { console.log("✅ Connected to relay:", config.url); const broadcast = connection.consume(Moq.Path.from(config.broadcast)); - const track = broadcast.subscribe(config.track, 0); + const track = broadcast.subscribe({ name: config.track }); console.log("✅ Subscribed to track:", config.track); @@ -157,16 +159,16 @@ async function subscribe(config: Config) { continue; } - const base = new TextDecoder().decode(baseFrame); + const base = baseFrame.toString(); // Read individual second frames for (;;) { - const frame = await group.readString(); + const frame = await group.readFrame(); if (!frame) { break; // End of group } - const seconds = parseInt(frame, 10); + const seconds = parseInt(frame.toString(), 10); // Clock emoji positions const clockEmojis = ["🕛", "🕐", "🕑", "🕒", "🕓", "🕔", "🕕", "🕖", "🕗", "🕘", "🕙", "🕚"]; diff --git a/js/hang/src/catalog/audio.ts b/js/hang/src/catalog/audio.ts index e57710bf2..6189b8608 100644 --- a/js/hang/src/catalog/audio.ts +++ b/js/hang/src/catalog/audio.ts @@ -1,12 +1,7 @@ import { z } from "zod"; import { ContainerSchema, DEFAULT_CONTAINER } from "./container"; import { u53Schema } from "./integers"; - -// Backwards compatibility: old track schema -const TrackSchema = z.object({ - name: z.string(), - priority: z.number().int().min(0).max(255), -}); +import { TrackSchema } from "./track"; // Mirrors AudioDecoderConfig // https://w3c.github.io/webcodecs/#audio-decoder-config @@ -40,8 +35,9 @@ export const AudioSchema = z // This is not an array so it will work with JSON Merge Patch. renditions: z.record(z.string(), AudioConfigSchema), - // The priority of the audio track, relative to other tracks in the broadcast. - priority: z.number().int().min(0).max(255), + // DEPRECATED: The priority of the audio track, relative to other tracks in the broadcast. + // The subscriber is expected to choose its own priority, instead of being told. + priority: z.number().int().min(0).max(255).default(0), }) .or( // Backwards compatibility: transform old {track, config} format to new object format diff --git a/js/hang/src/catalog/root.ts b/js/hang/src/catalog/root.ts index 7c9684c3e..7cf6f1e4d 100644 --- a/js/hang/src/catalog/root.ts +++ b/js/hang/src/catalog/root.ts @@ -41,5 +41,5 @@ export function decode(raw: Uint8Array): Root { export async function fetch(track: Moq.Track): Promise { const frame = await track.readFrame(); if (!frame) return undefined; - return decode(frame); + return decode(frame.payload); } diff --git a/js/hang/src/catalog/track.ts b/js/hang/src/catalog/track.ts index 7eb18e5ac..29fd12ea5 100644 --- a/js/hang/src/catalog/track.ts +++ b/js/hang/src/catalog/track.ts @@ -2,6 +2,8 @@ import { z } from "zod"; export const TrackSchema = z.object({ name: z.string(), - priority: z.number().int().min(0).max(255), + // DEPRECATED: The priority of the track, relative to other tracks in the broadcast. + // The subscriber is supposed to choose its own priority, instead of being told. + priority: z.number().int().min(0).max(255).default(0), }); export type Track = z.infer; diff --git a/js/hang/src/catalog/video.ts b/js/hang/src/catalog/video.ts index b8b77883f..f6a20f7f2 100644 --- a/js/hang/src/catalog/video.ts +++ b/js/hang/src/catalog/video.ts @@ -1,12 +1,7 @@ import { z } from "zod"; import { ContainerSchema, DEFAULT_CONTAINER } from "./container"; import { u53Schema } from "./integers"; - -// Backwards compatibility: old track schema -const TrackSchema = z.object({ - name: z.string(), - priority: z.number().int().min(0).max(255), -}); +import { TrackSchema } from "./track"; // Based on VideoDecoderConfig export const VideoConfigSchema = z.object({ @@ -54,7 +49,8 @@ export const VideoSchema = z renditions: z.record(z.string(), VideoConfigSchema), // The priority of the video track, relative to other tracks in the broadcast. - priority: z.number().int().min(0).max(255), + // TODO: Remove this; it's for backwards compatibility only + priority: z.number().int().min(0).max(255).default(0), // Render the video at this size in pixels. // This is separate from the display aspect ratio because it does not require reinitialization. diff --git a/js/hang/src/frame.ts b/js/hang/src/frame.ts index a0fd86219..6fac8639c 100644 --- a/js/hang/src/frame.ts +++ b/js/hang/src/frame.ts @@ -1,4 +1,4 @@ -import type * as Moq from "@moq/lite"; +import * as Moq from "@moq/lite"; import { Time } from "@moq/lite"; import { Effect, Signal } from "@moq/signals"; import type * as Catalog from "./catalog"; @@ -16,8 +16,9 @@ export interface Frame { group: number; } -export function encode(source: Uint8Array | Source, timestamp: Time.Micro, container?: Catalog.Container): Uint8Array { +export function encode(source: Uint8Array | Source, timestamp: Time.Micro, container?: Catalog.Container): Moq.Frame { // Encode timestamp using the specified container format + // TODO This should be delta encoded, not the full timestamp. const timestampBytes = Container.encodeTimestamp(timestamp, container); // Allocate buffer for timestamp + payload @@ -34,14 +35,18 @@ export function encode(source: Uint8Array | Source, timestamp: Time.Micro, conta source.copyTo(data.subarray(timestampBytes.byteLength)); } - return data; + // NOTE: We encode the timestamp into the MoQ layer as well, but in milliseconds. + // TODO: Once this is widespread enough, we should use it at least as the base. + return new Moq.Frame({ payload: data, instant: Time.Milli.fromMicro(timestamp) }); } // NOTE: A keyframe is always the first frame in a group, so it's not encoded on the wire. -export function decode(buffer: Uint8Array, container?: Catalog.Container): { data: Uint8Array; timestamp: Time.Micro } { +export function decode(frame: Moq.Frame, container?: Catalog.Container): { data: Uint8Array; timestamp: Time.Micro } { // Decode timestamp using the specified container format - const [timestamp, data] = Container.decodeTimestamp(buffer, container); - return { timestamp: timestamp as Time.Micro, data }; + // TODO This should be delta encoded, not the full timestamp. + // TODO: Use frame.instant to avoid double encoding the timestamp. + const [timestamp, data] = Container.decodeTimestamp(frame.payload, container); + return { timestamp, data }; } export class Producer { diff --git a/js/hang/src/publish/priority.ts b/js/hang/src/priority.ts similarity index 100% rename from js/hang/src/publish/priority.ts rename to js/hang/src/priority.ts diff --git a/js/hang/src/publish/audio/encoder.ts b/js/hang/src/publish/audio/encoder.ts index a19e096f0..9587621f0 100644 --- a/js/hang/src/publish/audio/encoder.ts +++ b/js/hang/src/publish/audio/encoder.ts @@ -5,8 +5,8 @@ import type * as Catalog from "../../catalog"; import { DEFAULT_CONTAINER } from "../../catalog"; import { u53 } from "../../catalog/integers"; import * as Frame from "../../frame"; +import { PRIORITY } from "../../priority"; import * as libav from "../../util/libav"; -import { PRIORITY } from "../priority"; import type * as Capture from "./capture"; import type { Source } from "./types"; diff --git a/js/hang/src/publish/broadcast.ts b/js/hang/src/publish/broadcast.ts index b152fb3d5..fc07f4201 100644 --- a/js/hang/src/publish/broadcast.ts +++ b/js/hang/src/publish/broadcast.ts @@ -72,45 +72,43 @@ export class Broadcast { async #runBroadcast(broadcast: Moq.Broadcast, effect: Effect) { for (;;) { - const request = await broadcast.requested(); - if (!request) break; + const track = await broadcast.requested(); + if (!track) break; - effect.cleanup(() => request.track.close()); + effect.cleanup(() => track.close()); effect.effect((effect) => { - if (effect.get(request.track.state.closed)) return; - - switch (request.track.name) { + switch (track.name) { case Broadcast.CATALOG_TRACK: - this.#serveCatalog(request.track, effect); + this.#serveCatalog(track, effect); break; case Location.Window.TRACK: - this.location.window.serve(request.track, effect); + this.location.window.serve(track, effect); break; case Location.Peers.TRACK: - this.location.peers.serve(request.track, effect); + this.location.peers.serve(track, effect); break; case Preview.TRACK: - this.preview.serve(request.track, effect); + this.preview.serve(track, effect); break; case Chat.Typing.TRACK: - this.chat.typing.serve(request.track, effect); + this.chat.typing.serve(track, effect); break; case Chat.Message.TRACK: - this.chat.message.serve(request.track, effect); + this.chat.message.serve(track, effect); break; case Audio.Encoder.TRACK: - this.audio.serve(request.track, effect); + this.audio.serve(track, effect); break; case Video.Root.TRACK_HD: - this.video.hd.serve(request.track, effect); + this.video.hd.serve(track, effect); break; case Video.Root.TRACK_SD: - this.video.sd.serve(request.track, effect); + this.video.sd.serve(track, effect); break; default: - console.error("received subscription for unknown track", request.track.name); - request.track.close(new Error(`Unknown track: ${request.track.name}`)); + console.error("received subscription for unknown track", track.name); + track.close(new Error(`Unknown track: ${track.name}`)); break; } }); @@ -120,7 +118,8 @@ export class Broadcast { #serveCatalog(track: Moq.Track, effect: Effect): void { if (!effect.get(this.enabled)) { // Clear the catalog. - track.writeFrame(Catalog.encode({})); + const frame = new Moq.Frame({ payload: Catalog.encode({}) }); + track.writeFrame(frame); return; } @@ -134,8 +133,8 @@ export class Broadcast { preview: effect.get(this.preview.catalog), }; - const encoded = Catalog.encode(catalog); - track.writeFrame(encoded); + const frame = new Moq.Frame({ payload: Catalog.encode(catalog) }); + track.writeFrame(frame); } close() { diff --git a/js/hang/src/publish/chat/message.ts b/js/hang/src/publish/chat/message.ts index 343f715ef..c3c07bddd 100644 --- a/js/hang/src/publish/chat/message.ts +++ b/js/hang/src/publish/chat/message.ts @@ -1,7 +1,7 @@ -import type * as Moq from "@moq/lite"; +import * as Moq from "@moq/lite"; import { Effect, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; -import { PRIORITY } from "../priority"; +import { PRIORITY } from "../../priority"; export type MessageProps = { enabled?: boolean | Signal; @@ -37,7 +37,8 @@ export class Message { if (!enabled) return; const latest = effect.get(this.latest); - track.writeString(latest ?? ""); + const frame = Moq.Frame.fromString(latest ?? ""); + track.writeFrame(frame); } close() { diff --git a/js/hang/src/publish/chat/typing.ts b/js/hang/src/publish/chat/typing.ts index 419ad56cf..7d0af2b75 100644 --- a/js/hang/src/publish/chat/typing.ts +++ b/js/hang/src/publish/chat/typing.ts @@ -1,7 +1,7 @@ -import type * as Moq from "@moq/lite"; +import * as Moq from "@moq/lite"; import { Effect, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; -import { PRIORITY } from "../priority"; +import { PRIORITY } from "../../priority"; export type TypingProps = { enabled?: boolean | Signal; @@ -37,7 +37,8 @@ export class Typing { if (!enabled) return; const active = effect.get(this.active); - track.writeBool(active); + const frame = Moq.Frame.fromBool(active); + track.writeFrame(frame); } close() { diff --git a/js/hang/src/publish/location/peers.ts b/js/hang/src/publish/location/peers.ts index 9811431ac..3bfc2f281 100644 --- a/js/hang/src/publish/location/peers.ts +++ b/js/hang/src/publish/location/peers.ts @@ -2,7 +2,7 @@ import type * as Moq from "@moq/lite"; import * as Zod from "@moq/lite/zod"; import { Effect, Signal } from "@moq/signals"; import * as Catalog from "../../catalog"; -import { PRIORITY } from "../priority"; +import { PRIORITY } from "../../priority"; export interface PeersProps { enabled?: boolean | Signal; diff --git a/js/hang/src/publish/location/window.ts b/js/hang/src/publish/location/window.ts index dd72bbe02..0a9574b5e 100644 --- a/js/hang/src/publish/location/window.ts +++ b/js/hang/src/publish/location/window.ts @@ -2,7 +2,7 @@ import type * as Moq from "@moq/lite"; import * as Zod from "@moq/lite/zod"; import { Effect, Signal } from "@moq/signals"; import * as Catalog from "../../catalog"; -import { PRIORITY } from "../priority"; +import { PRIORITY } from "../../priority"; export type WindowProps = { // If true, then we'll publish our position to the broadcast. diff --git a/js/hang/src/publish/preview.ts b/js/hang/src/publish/preview.ts index 91df96d1f..22909884b 100644 --- a/js/hang/src/publish/preview.ts +++ b/js/hang/src/publish/preview.ts @@ -1,7 +1,7 @@ -import type * as Moq from "@moq/lite"; +import * as Moq from "@moq/lite"; import { Effect, Signal } from "@moq/signals"; import type * as Catalog from "../catalog"; -import { PRIORITY } from "./priority"; +import { PRIORITY } from "../priority"; export type PreviewProps = { enabled?: boolean | Signal; @@ -35,7 +35,8 @@ export class Preview { const info = effect.get(this.info); if (!info) return; - track.writeJson(info); + const frame = Moq.Frame.fromJson(info); + track.writeFrame(frame); } close() { diff --git a/js/hang/src/publish/video/index.ts b/js/hang/src/publish/video/index.ts index 18c838891..c8f07b950 100644 --- a/js/hang/src/publish/video/index.ts +++ b/js/hang/src/publish/video/index.ts @@ -1,6 +1,6 @@ import { Effect, Signal } from "@moq/signals"; import * as Catalog from "../../catalog"; -import { PRIORITY } from "../priority"; +import { PRIORITY } from "../../priority"; import { Encoder, type EncoderProps } from "./encoder"; import { TrackProcessor } from "./polyfill"; import type { Source } from "./types"; diff --git a/js/hang/src/watch/audio/source.ts b/js/hang/src/watch/audio/source.ts index fad996840..d3bb721e9 100644 --- a/js/hang/src/watch/audio/source.ts +++ b/js/hang/src/watch/audio/source.ts @@ -3,6 +3,7 @@ import type { Time } from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; import * as Frame from "../../frame"; +import { PRIORITY } from "../../priority"; import * as Hex from "../../util/hex"; import * as libav from "../../util/libav"; import type * as Render from "./render"; @@ -163,7 +164,7 @@ export class Source { const active = effect.get(this.active); if (!active) return; - const sub = broadcast.subscribe(active, catalog.priority); + const sub = broadcast.subscribe({ name: active, priority: PRIORITY.audio, maxLatency: this.latency }); effect.cleanup(() => sub.close()); // Create consumer with slightly less latency than the render worklet to avoid underflowing. diff --git a/js/hang/src/watch/broadcast.ts b/js/hang/src/watch/broadcast.ts index c8fe30a1a..104dfc962 100644 --- a/js/hang/src/watch/broadcast.ts +++ b/js/hang/src/watch/broadcast.ts @@ -1,7 +1,7 @@ import type * as Moq from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import * as Catalog from "../catalog"; -import { PRIORITY } from "../publish/priority"; +import { PRIORITY } from "../priority"; import * as Audio from "./audio"; import { Chat, type ChatProps } from "./chat"; import * as Location from "./location"; @@ -101,7 +101,6 @@ export class Broadcast { // Require full equality if (update.path !== path) { - console.warn("ignoring announce", update.path); continue; } @@ -131,7 +130,7 @@ export class Broadcast { this.status.set("loading"); - const catalog = broadcast.subscribe("catalog.json", PRIORITY.catalog); + const catalog = broadcast.subscribe({ name: "catalog.json", priority: PRIORITY.catalog }); effect.cleanup(() => catalog.close()); effect.spawn(this.#fetchCatalog.bind(this, catalog)); diff --git a/js/hang/src/watch/chat/message.ts b/js/hang/src/watch/chat/message.ts index a4d5e5417..0b0f99e50 100644 --- a/js/hang/src/watch/chat/message.ts +++ b/js/hang/src/watch/chat/message.ts @@ -1,6 +1,7 @@ import type * as Moq from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; +import { PRIORITY } from "../../priority"; export interface MessageProps { // Whether to start downloading the chat. @@ -47,7 +48,7 @@ export class Message { const broadcast = effect.get(this.broadcast); if (!broadcast) return; - const track = broadcast.subscribe(catalog.name, catalog.priority); + const track = broadcast.subscribe({ name: catalog.name, priority: PRIORITY.chat }); effect.cleanup(() => track.close()); // Undefined is only when we're not subscribed to the track. @@ -56,11 +57,11 @@ export class Message { effect.spawn(async () => { for (;;) { - const frame = await track.readString(); + const frame = await track.readFrame(); if (frame === undefined) break; // Use a function to avoid the dequal check. - this.#latest.set(frame); + this.#latest.set(frame.toString()); } }); } diff --git a/js/hang/src/watch/chat/typing.ts b/js/hang/src/watch/chat/typing.ts index 39139d582..32d0df91c 100644 --- a/js/hang/src/watch/chat/typing.ts +++ b/js/hang/src/watch/chat/typing.ts @@ -1,6 +1,7 @@ import type * as Moq from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; +import { PRIORITY } from "../../priority"; export interface TypingProps { // Whether to start downloading the chat. @@ -45,15 +46,15 @@ export class Typing { const broadcast = effect.get(this.broadcast); if (!broadcast) return; - const track = broadcast.subscribe(catalog.name, catalog.priority); + const track = broadcast.subscribe({ name: catalog.name, priority: PRIORITY.typing }); effect.cleanup(() => track.close()); effect.spawn(async () => { for (;;) { - const value = await track.readBool(); - if (value === undefined) break; + const frame = await track.readFrame(); + if (frame === undefined) break; - this.active.set(value); + this.active.set(frame.toBool()); } }); diff --git a/js/hang/src/watch/location/peers.ts b/js/hang/src/watch/location/peers.ts index 56acbbdce..c8806a1cf 100644 --- a/js/hang/src/watch/location/peers.ts +++ b/js/hang/src/watch/location/peers.ts @@ -2,6 +2,7 @@ import type * as Moq from "@moq/lite"; import * as Zod from "@moq/lite/zod"; import { Effect, type Getter, Signal } from "@moq/signals"; import * as Catalog from "../../catalog"; +import { PRIORITY } from "../../priority"; export interface PeersProps { enabled?: boolean | Signal; @@ -41,7 +42,7 @@ export class Peers { const broadcast = effect.get(this.broadcast); if (!broadcast) return; - const track = broadcast.subscribe(catalog.name, catalog.priority); + const track = broadcast.subscribe({ name: catalog.name, priority: PRIORITY.location }); effect.cleanup(() => track.close()); effect.spawn(this.#runTrack.bind(this, track)); diff --git a/js/hang/src/watch/location/window.ts b/js/hang/src/watch/location/window.ts index 79b7c53d2..2db6e1a01 100644 --- a/js/hang/src/watch/location/window.ts +++ b/js/hang/src/watch/location/window.ts @@ -2,6 +2,7 @@ import type * as Moq from "@moq/lite"; import * as Zod from "@moq/lite/zod"; import { Effect, type Getter, Signal } from "@moq/signals"; import * as Catalog from "../../catalog"; +import { PRIORITY } from "../../priority"; export interface WindowProps { enabled?: boolean | Signal; @@ -50,7 +51,7 @@ export class Window { const updates = effect.get(this.#catalog)?.track; if (!updates) return; - const track = broadcast.subscribe(updates.name, updates.priority); + const track = broadcast.subscribe({ name: updates.name, priority: PRIORITY.location }); effect.cleanup(() => track.close()); effect.spawn(this.#runTrack.bind(this, track)); diff --git a/js/hang/src/watch/preview.ts b/js/hang/src/watch/preview.ts index bfa0bf523..8558dbd52 100644 --- a/js/hang/src/watch/preview.ts +++ b/js/hang/src/watch/preview.ts @@ -2,6 +2,7 @@ import type * as Moq from "@moq/lite"; import * as Zod from "@moq/lite/zod"; import { Effect, Signal } from "@moq/signals"; import * as Catalog from "../catalog"; +import { PRIORITY } from "../priority"; export interface PreviewProps { enabled?: boolean | Signal; @@ -37,7 +38,8 @@ export class Preview { if (!catalog) return; // Subscribe to the preview.json track directly - const track = broadcast.subscribe(catalog.name, catalog.priority); + // TODO maxLatency + const track = broadcast.subscribe({ name: catalog.name, priority: PRIORITY.preview }); effect.cleanup(() => track.close()); effect.spawn(async () => { diff --git a/js/hang/src/watch/video/source.ts b/js/hang/src/watch/video/source.ts index 406a600bb..f53087795 100644 --- a/js/hang/src/watch/video/source.ts +++ b/js/hang/src/watch/video/source.ts @@ -3,7 +3,7 @@ import type { Time } from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; import * as Frame from "../../frame"; -import { PRIORITY } from "../../publish/priority"; +import { PRIORITY } from "../../priority"; import * as Hex from "../../util/hex"; export type SourceProps = { @@ -192,7 +192,11 @@ export class Source { } #runTrack(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { - const sub = broadcast.subscribe(name, PRIORITY.video); // TODO use priority from catalog + const sub = broadcast.subscribe({ + name: name, + priority: PRIORITY.video, + maxLatency: this.latency, + }); // TODO use priority from catalog effect.cleanup(() => sub.close()); // Create consumer that reorders groups/frames up to the provided latency. diff --git a/js/lite/examples/publish.ts b/js/lite/examples/publish.ts index ffd345059..296c07d21 100644 --- a/js/lite/examples/publish.ts +++ b/js/lite/examples/publish.ts @@ -13,15 +13,15 @@ async function main() { // Wait for subscription requests for (;;) { - const request = await broadcast.requested(); - if (!request) break; + const track = await broadcast.requested(); + if (!track) break; // Accept the request for the "chat" track - if (request.track.name === "chat") { - publishTrack(request.track); + if (track.name === "chat") { + publishTrack(track); } else { // Reject other tracks - request.track.close(new Error("track not found")); + track.close(new Error("track not found")); } } } @@ -32,10 +32,9 @@ async function publishTrack(track: Moq.Track) { // Create a group (e.g., keyframe boundary) const group = track.appendGroup(); - // Write two frames to the group - for (const frame of ["Hello", "MoQ!"]) { - group.writeString(frame); - } + // Write frames to the group + const frame = Moq.Frame.fromString("Hello, MoQ!"); + group.writeFrame(frame); // Mark the group as complete group.close(); diff --git a/js/lite/examples/subscribe.ts b/js/lite/examples/subscribe.ts index e294fdd6a..a675b1a79 100644 --- a/js/lite/examples/subscribe.ts +++ b/js/lite/examples/subscribe.ts @@ -7,8 +7,8 @@ async function main() { // Subscribe to a broadcast const broadcast = connection.consume(Moq.Path.from("my-broadcast")); - // Subscribe to a specific track (with priority 0) - const track = broadcast.subscribe("chat", 0); + // Subscribe to a specific track + const track = broadcast.subscribe({ name: "chat" }); // Read data as it arrives for (;;) { @@ -16,10 +16,10 @@ async function main() { if (!group) break; for (;;) { - const frame = await group.readString(); + const frame = await group.readFrame(); if (!frame) break; - console.log("Received:", frame); + console.log("Received:", frame.toString()); } } diff --git a/js/lite/src/broadcast.ts b/js/lite/src/broadcast.ts index f3e13805f..fd48e3a81 100644 --- a/js/lite/src/broadcast.ts +++ b/js/lite/src/broadcast.ts @@ -1,15 +1,5 @@ import { Signal } from "@moq/signals"; -import { Track } from "./track.ts"; - -export interface TrackRequest { - track: Track; - priority: number; -} - -export class BroadcastState { - requested = new Signal([]); - closed = new Signal(false); -} +import { Track, type TrackProps } from "./track.js"; /** * Handles writing and managing tracks in a broadcast. @@ -17,13 +7,14 @@ export class BroadcastState { * @public */ export class Broadcast { - state = new BroadcastState(); + #requested = new Signal([]); + #closed = new Signal(false); readonly closed: Promise; constructor() { this.closed = new Promise((resolve) => { - const dispose = this.state.closed.subscribe((closed) => { + const dispose = this.#closed.subscribe((closed) => { if (!closed) return; resolve(closed instanceof Error ? closed : undefined); dispose(); @@ -34,33 +25,38 @@ export class Broadcast { /** * A track requested over the network. */ - async requested(): Promise { + async requested(): Promise { for (;;) { // We use pop instead of shift because it's slightly more efficient. - const track = this.state.requested.peek().pop(); + const track = this.#requested.peek().pop(); if (track) return track; - const closed = this.state.closed.peek(); + const closed = this.#closed.peek(); if (closed instanceof Error) throw closed; if (closed) return undefined; - await Signal.race(this.state.requested, this.state.closed); + await Signal.race(this.#requested, this.#closed); } } /** - * Populates the provided track over the network. + * Creates a new track and serves it over the network. */ - subscribe(name: string, priority: number): Track { - const track = new Track(name); + subscribe(track: TrackProps): Track { + const t = new Track(track); + this.serve(t); + return t; + } - if (this.state.closed.peek()) { - throw new Error(`broadcast is closed: ${this.state.closed.peek()}`); + /** + * Populates the provided track over the network. + */ + serve(track: Track) { + if (this.#closed.peek()) { + throw new Error(`broadcast is closed: ${this.#closed.peek()}`); } - this.state.requested.mutate((requested) => { - requested.push({ track, priority }); - // Sort the tracks by priority in ascending order (we will pop) - requested.sort((a, b) => a.priority - b.priority); + this.#requested.mutate((requested) => { + requested.push(track); }); return track; @@ -72,11 +68,11 @@ export class Broadcast { * @param abort - If provided, throw this exception instead of returning undefined. */ close(abort?: Error) { - this.state.closed.set(abort ?? true); - for (const { track } of this.state.requested.peek()) { + this.#closed.set(abort ?? true); + for (const track of this.#requested.peek()) { track.close(abort); } - this.state.requested.mutate((requested) => { + this.#requested.mutate((requested) => { requested.length = 0; }); } diff --git a/js/lite/src/connection/connect.ts b/js/lite/src/connection/connect.ts index 3bf642d1e..9708d483e 100644 --- a/js/lite/src/connection/connect.ts +++ b/js/lite/src/connection/connect.ts @@ -2,9 +2,12 @@ import WebTransportWs from "@moq/web-transport-ws"; import * as Ietf from "../ietf/index.ts"; import * as Lite from "../lite/index.ts"; import { Stream } from "../stream.ts"; +import type * as Time from "../time.ts"; import * as Hex from "../util/hex.ts"; import type { Established } from "./established.ts"; +const SUPPORTED = [Lite.Version.DRAFT_03, Lite.Version.DRAFT_02, Lite.Version.DRAFT_01, Ietf.Version.DRAFT_14]; + export interface WebSocketOptions { // If true (default), enable the WebSocket fallback. enabled?: boolean; @@ -13,9 +16,9 @@ export interface WebSocketOptions { // By default, `https` => `wss` and `http` => `ws`. url?: URL; - // The delay in milliseconds before attempting the WebSocket fallback. (default: 200) + // The delay in milliseconds before attempting the WebSocket fallback. (default: 500) // If WebSocket won the previous race for a given URL, this will be 0. - delay?: DOMHighResTimeStamp; + delay?: Time.Milli; } export interface ConnectProps { @@ -44,9 +47,9 @@ export async function connect(url: URL, props?: ConnectProps): Promise this.#tick.update((prev) => Math.max(prev, tick)), this.#delay); - this.#delay = Math.min(this.#delay * this.delay.multiplier, this.delay.max); + this.#delay = Math.min(this.#delay * this.delay.multiplier, this.delay.max) as Time.Milli; } }); } diff --git a/js/lite/src/frame.ts b/js/lite/src/frame.ts new file mode 100644 index 000000000..0a45ba98d --- /dev/null +++ b/js/lite/src/frame.ts @@ -0,0 +1,38 @@ +import * as Time from "./time"; + +export class Frame { + instant: Time.Milli; + payload: Uint8Array; + + constructor({ payload, instant = Time.Milli.now() }: { payload: Uint8Array; instant?: Time.Milli }) { + this.instant = instant; + this.payload = payload; + } + + static fromString(str: string, instant = Time.Milli.now()) { + return new Frame({ payload: new TextEncoder().encode(str), instant }); + } + + static fromJson(json: unknown, instant = Time.Milli.now()) { + return new Frame({ payload: new TextEncoder().encode(JSON.stringify(json)), instant }); + } + + static fromBool(bool: boolean, instant = Time.Milli.now()) { + return new Frame({ payload: new Uint8Array([bool ? 1 : 0]), instant }); + } + + toString() { + return new TextDecoder().decode(this.payload); + } + + toJson() { + return JSON.parse(this.toString()); + } + + toBool() { + if (this.payload.byteLength !== 1) throw new Error("invalid bool frame"); + if (this.payload[0] === 0) return false; + if (this.payload[0] === 1) return true; + throw new Error("invalid bool frame"); + } +} diff --git a/js/lite/src/group.ts b/js/lite/src/group.ts index 6aefa47c6..b271fea6e 100644 --- a/js/lite/src/group.ts +++ b/js/lite/src/group.ts @@ -1,7 +1,8 @@ import { Signal } from "@moq/signals"; +import type { Frame } from "./frame"; export class GroupState { - frames = new Signal([]); + frames = new Signal([]); closed = new Signal(false); total = new Signal(0); // The total number of frames in the group thus far } @@ -29,7 +30,7 @@ export class Group { * Writes a frame to the group. * @param frame - The frame to write */ - writeFrame(frame: Uint8Array) { + writeFrame(frame: Frame) { if (this.state.closed.peek()) throw new Error("group is closed"); this.state.frames.mutate((frames) => { @@ -39,23 +40,11 @@ export class Group { this.state.total.update((total) => total + 1); } - writeString(str: string) { - this.writeFrame(new TextEncoder().encode(str)); - } - - writeJson(json: unknown) { - this.writeString(JSON.stringify(json)); - } - - writeBool(bool: boolean) { - this.writeFrame(new Uint8Array([bool ? 1 : 0])); - } - /** * Reads the next frame from the group. * @returns A promise that resolves to the next frame or undefined */ - async readFrame(): Promise { + async readFrame(): Promise { for (;;) { const frames = this.state.frames.peek(); const frame = frames.shift(); @@ -69,11 +58,11 @@ export class Group { } } - async readFrameSequence(): Promise<{ sequence: number; data: Uint8Array } | undefined> { + async readFrameSequence(): Promise<{ sequence: number; frame: Frame } | undefined> { for (;;) { const frames = this.state.frames.peek(); const frame = frames.shift(); - if (frame) return { sequence: this.state.total.peek() - frames.length - 1, data: frame }; + if (frame) return { sequence: this.state.total.peek() - frames.length - 1, frame }; const closed = this.state.closed.peek(); if (closed instanceof Error) throw closed; @@ -83,21 +72,6 @@ export class Group { } } - async readString(): Promise { - const frame = await this.readFrame(); - return frame ? new TextDecoder().decode(frame) : undefined; - } - - async readJson(): Promise { - const frame = await this.readString(); - return frame ? JSON.parse(frame) : undefined; - } - - async readBool(): Promise { - const frame = await this.readFrame(); - return frame ? frame[0] === 1 : undefined; - } - close(abort?: Error) { this.state.closed.set(abort ?? true); } diff --git a/js/lite/src/ietf/publisher.ts b/js/lite/src/ietf/publisher.ts index ab5031bea..1dd66e39c 100644 --- a/js/lite/src/ietf/publisher.ts +++ b/js/lite/src/ietf/publisher.ts @@ -2,10 +2,11 @@ import type { Broadcast } from "../broadcast.ts"; import type { Group } from "../group.ts"; import type * as Path from "../path.ts"; import { Writer } from "../stream.ts"; -import type { Track } from "../track.ts"; +import type * as Time from "../time.ts"; +import { Track } from "../track.js"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; -import { Frame, Group as GroupMessage } from "./object.ts"; +import { Frame as FrameMessage, Group as GroupMessage } from "./object.ts"; import { PublishDone } from "./publish.ts"; import { PublishNamespace, @@ -94,7 +95,12 @@ export class Publisher { return; } - const track = broadcast.subscribe(msg.trackName, msg.subscriberPriority); + const track = new Track({ + name: msg.trackName, + priority: msg.subscriberPriority, + maxLatency: 100 as Time.Milli, // TODO delivery timeout + }); + broadcast.serve(track); // Send SUBSCRIBE_OK response on control stream const okMsg = new SubscribeOk(msg.requestId, msg.requestId); @@ -164,7 +170,8 @@ export class Publisher { if (!frame) break; // Write each frame as an object - const obj = new Frame(frame); + // TODO support timestamps + const obj = new FrameMessage(frame.payload); await obj.encode(stream, header.flags); } diff --git a/js/lite/src/ietf/subscriber.ts b/js/lite/src/ietf/subscriber.ts index 8d93f144a..9b2a72ca7 100644 --- a/js/lite/src/ietf/subscriber.ts +++ b/js/lite/src/ietf/subscriber.ts @@ -1,12 +1,13 @@ import { Announced } from "../announced.ts"; -import { Broadcast, type TrackRequest } from "../broadcast.ts"; +import { Broadcast } from "../broadcast.ts"; +import { Frame } from "../frame.ts"; import { Group } from "../group.ts"; import * as Path from "../path.ts"; import type { Reader } from "../stream.ts"; import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import type * as Control from "./control.ts"; -import { Frame, type Group as GroupMessage } from "./object.ts"; +import { Frame as FrameMessage, type Group as GroupMessage } from "./object.ts"; import { type Publish, type PublishDone, PublishError } from "./publish.ts"; import type { PublishNamespace, PublishNamespaceDone } from "./publish_namespace.ts"; import { Subscribe, type SubscribeError, type SubscribeOk, Unsubscribe } from "./subscribe.ts"; @@ -114,15 +115,15 @@ export class Subscriber { return broadcast; } - async #runSubscribe(broadcast: Path.Valid, request: TrackRequest) { + async #runSubscribe(broadcast: Path.Valid, track: Track) { const requestId = await this.#control.nextRequestId(); if (requestId === undefined) return; - this.#subscribes.set(requestId, request.track); + this.#subscribes.set(requestId, track); - console.debug(`subscribe start: id=${requestId} broadcast=${broadcast} track=${request.track.name}`); + console.debug(`subscribe start: id=${requestId} broadcast=${broadcast} track=${track.name}`); - const msg = new Subscribe(requestId, broadcast, request.track.name, request.priority); + const msg = new Subscribe(requestId, broadcast, track.name, track.priority.peek()); // Send SUBSCRIBE message on control stream and wait for response const responsePromise = new Promise((resolve, reject) => { @@ -134,23 +135,23 @@ export class Subscriber { try { const ok = await responsePromise; this.#trackAliases.set(ok.trackAlias, requestId); - console.debug(`subscribe ok: id=${requestId} broadcast=${broadcast} track=${request.track.name}`); + console.debug(`subscribe ok: id=${requestId} broadcast=${broadcast} track=${track.name}`); try { - await request.track.closed; + await track.closed; const msg = new Unsubscribe(requestId); await this.#control.write(msg); - console.debug(`unsubscribe: id=${requestId} broadcast=${broadcast} track=${request.track.name}`); + console.debug(`unsubscribe: id=${requestId} broadcast=${broadcast} track=${track.name}`); } finally { this.#trackAliases.delete(ok.trackAlias); } } catch (err) { const e = error(err); - request.track.close(e); + track.close(e); console.warn( - `subscribe error: id=${requestId} broadcast=${broadcast} track=${request.track.name} error=${e.message}`, + `subscribe error: id=${requestId} broadcast=${broadcast} track=${track.name} error=${e.message}`, ); } finally { this.#subscribes.delete(requestId); @@ -225,11 +226,14 @@ export class Subscriber { const done = await Promise.race([stream.done(), producer.closed, track.closed]); if (done !== false) break; - const frame = await Frame.decode(stream, group.flags); - if (frame.payload === undefined) break; + const msg = await FrameMessage.decode(stream, group.flags); + if (msg.payload === undefined) break; + + // TODO support timestamps + const frame = new Frame({ payload: msg.payload }); // Treat each object payload as a frame - producer.writeFrame(frame.payload); + producer.writeFrame(frame); } producer.close(); diff --git a/js/lite/src/index.ts b/js/lite/src/index.ts index cf64a4705..21c947500 100644 --- a/js/lite/src/index.ts +++ b/js/lite/src/index.ts @@ -1,6 +1,7 @@ export * from "./announced.ts"; export * from "./broadcast.ts"; export * as Connection from "./connection/index.ts"; +export * from "./frame.ts"; export * from "./group.ts"; export * as Path from "./path.ts"; export * as Time from "./time.ts"; diff --git a/js/lite/src/lite/announce.ts b/js/lite/src/lite/announce.ts index 3a5140f68..76e1b19b7 100644 --- a/js/lite/src/lite/announce.ts +++ b/js/lite/src/lite/announce.ts @@ -29,10 +29,6 @@ export class Announce { static async decode(r: Reader): Promise { return Message.decode(r, Announce.#decode); } - - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, Announce.#decode); - } } export class AnnounceInterest { diff --git a/js/lite/src/lite/connection.ts b/js/lite/src/lite/connection.ts index a7ce8c922..f31f41fe6 100644 --- a/js/lite/src/lite/connection.ts +++ b/js/lite/src/lite/connection.ts @@ -125,10 +125,10 @@ export class Connection implements Established { async #runSession() { try { // Receive messages until the connection is closed. - for (;;) { - const msg = await SessionInfo.decodeMaybe(this.#session.reader); - if (!msg) break; - // TODO use the session info + while (!(await this.#session.reader.done())) { + const msg = await SessionInfo.decode(this.#session.reader); + // TODO use SessionInfo + console.debug("session info", msg); } } finally { console.warn("session stream closed"); @@ -162,7 +162,7 @@ export class Connection implements Established { await this.#publisher.runAnnounce(msg, stream); return; } else if (typ === StreamId.Subscribe) { - const msg = await Subscribe.decode(stream.reader); + const msg = await Subscribe.decode(stream.reader, this.version); await this.#publisher.runSubscribe(msg, stream); return; } else { @@ -184,6 +184,7 @@ export class Connection implements Established { stream.stop(new Error("cancel")); }) .catch((err: unknown) => { + console.warn("error running uni stream", err); stream.stop(err); }); } diff --git a/js/lite/src/lite/frame.ts b/js/lite/src/lite/frame.ts new file mode 100644 index 000000000..9db31ddda --- /dev/null +++ b/js/lite/src/lite/frame.ts @@ -0,0 +1,61 @@ +import type { Reader, Writer } from "../stream"; +import * as Time from "../time.ts"; +import * as Message from "./message.ts"; +import { Version } from "./version.js"; + +export class Frame { + delta: Time.Milli; + payload: Uint8Array; + + constructor({ payload, delta }: { delta: Time.Milli; payload: Uint8Array }) { + this.payload = payload; + this.delta = delta; + } + + async #encode(w: Writer, version: Version) { + switch (version) { + case Version.DRAFT_03: + await w.u53(this.delta); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + default: { + const v: never = version; + throw new Error(`unsupported version: ${v}`); + } + } + + await w.write(this.payload); + } + + static async #decode(r: Reader, version: Version): Promise { + let delta: Time.Milli; + + switch (version) { + case Version.DRAFT_03: + delta = (await r.u53()) as Time.Milli; + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + // NOTE: The caller is responsible for calling Time.Milli.now() + delta = Time.Milli.zero; + break; + default: { + const v: never = version; + throw new Error(`unsupported version: ${v}`); + } + } + + const payload = await r.readAll(); + return new Frame({ delta, payload }); + } + + async encode(w: Writer, v: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, v)); + } + + static async decode(r: Reader, v: Version): Promise { + return Message.decode(r, (r) => Frame.#decode(r, v)); + } +} diff --git a/js/lite/src/lite/group.ts b/js/lite/src/lite/group.ts index 9cbc8d358..9dae19c9f 100644 --- a/js/lite/src/lite/group.ts +++ b/js/lite/src/lite/group.ts @@ -26,10 +26,6 @@ export class Group { static async decode(r: Reader): Promise { return Message.decode(r, Group.#decode); } - - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, Group.#decode); - } } export class GroupDrop { @@ -60,33 +56,4 @@ export class GroupDrop { static async decode(r: Reader): Promise { return Message.decode(r, GroupDrop.#decode); } - - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, GroupDrop.#decode); - } -} - -export class Frame { - payload: Uint8Array; - - constructor(payload: Uint8Array) { - this.payload = payload; - } - - async #encode(w: Writer) { - await w.write(this.payload); - } - - static async #decode(r: Reader): Promise { - const payload = await r.readAll(); - return new Frame(payload); - } - - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); - } - - static async decode(r: Reader): Promise { - return Message.decode(r, Frame.#decode); - } } diff --git a/js/lite/src/lite/message.ts b/js/lite/src/lite/message.ts index 208f76f58..a05482223 100644 --- a/js/lite/src/lite/message.ts +++ b/js/lite/src/lite/message.ts @@ -52,8 +52,3 @@ export async function decode(reader: Reader, f: (r: Reader) => Promise): P return msg; } - -export async function decodeMaybe(reader: Reader, f: (r: Reader) => Promise): Promise { - if (await reader.done()) return; - return await decode(reader, f); -} diff --git a/js/lite/src/lite/publisher.ts b/js/lite/src/lite/publisher.ts index 95ff646fe..f33f86e6c 100644 --- a/js/lite/src/lite/publisher.ts +++ b/js/lite/src/lite/publisher.ts @@ -3,9 +3,11 @@ import type { Broadcast } from "../broadcast.ts"; import type { Group } from "../group.ts"; import * as Path from "../path.ts"; import { type Stream, Writer } from "../stream.ts"; -import type { Track } from "../track.ts"; +import * as Time from "../time.ts"; +import { Track } from "../track.js"; import { error } from "../util/error.ts"; import { Announce, AnnounceInit, type AnnounceInterest } from "./announce.ts"; +import { Frame as FrameMessage } from "./frame.ts"; import { Group as GroupMessage } from "./group.ts"; import { type Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; import type { Version } from "./version.ts"; @@ -138,26 +140,44 @@ export class Publisher { return; } - const track = broadcast.subscribe(msg.track, msg.priority); + const track = new Track({ + name: msg.track, + priority: msg.priority, + maxLatency: msg.maxLatency, + ordered: msg.ordered, + }); + broadcast.serve(track); + + // When any of the properties are updated, send the SubscribeOk message + const sendOk = async () => { + const msg = new SubscribeOk({ + priority: track.priority.peek(), + maxLatency: track.maxLatency.peek(), + ordered: track.ordered.peek(), + }); + await msg.encode(stream.writer, this.version); + }; + + // TODO: There's no backpressure, so this could fail, but I hate javascript. + const dispose = track.priority.subscribe(sendOk); + const dispose2 = track.maxLatency.subscribe(sendOk); + const dispose3 = track.ordered.subscribe(sendOk); try { - const info = new SubscribeOk({ version: this.version, priority: msg.priority }); - await info.encode(stream.writer); + await sendOk(); console.debug(`publish ok: broadcast=${msg.broadcast} track=${track.name}`); const serving = this.#runTrack(msg.id, msg.broadcast, track, stream.writer); for (;;) { - const decode = SubscribeUpdate.decodeMaybe(stream.reader); - - const result = await Promise.any([serving, decode]); - if (!result) break; + const done = await Promise.any([serving, stream.reader.done()]); + if (done) break; - if (result instanceof SubscribeUpdate) { - // TODO use the update - console.warn("subscribe update not supported", result); - } + const update = await SubscribeUpdate.decode(stream.reader, this.version); + track.priority.set(update.priority); + track.maxLatency.set(update.maxLatency); + track.ordered.set(update.ordered); } console.debug(`publish done: broadcast=${msg.broadcast} track=${track.name}`); @@ -168,6 +188,10 @@ export class Publisher { console.warn(`publish error: broadcast=${msg.broadcast} track=${track.name} error=${e.message}`); track.close(e); stream.abort(e); + } finally { + dispose(); + dispose2(); + dispose3(); } } @@ -218,13 +242,25 @@ export class Publisher { await stream.u8(0); // stream type await msg.encode(stream); + let instant = Time.Milli.zero; + try { for (;;) { const frame = await Promise.race([group.readFrame(), stream.closed]); if (!frame) break; - await stream.u53(frame.byteLength); - await stream.write(frame); + let delta = (frame.instant - instant) as Time.Milli; + if (delta < 0) { + // TODO We either need to support this at the MoQ layer. + // Or we need to prevent this at the hang layer. + console.warn("MoQ timestamp went backwards"); + delta = Time.Milli.zero; + } else { + instant = frame.instant; + } + + const msg = new FrameMessage({ payload: frame.payload, delta }); + await msg.encode(stream, this.version); } stream.close(); diff --git a/js/lite/src/lite/session.ts b/js/lite/src/lite/session.ts index 55c7367ad..4394d2666 100644 --- a/js/lite/src/lite/session.ts +++ b/js/lite/src/lite/session.ts @@ -141,8 +141,4 @@ export class SessionInfo { static async decode(r: Reader): Promise { return Message.decode(r, SessionInfo.#decode); } - - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, SessionInfo.#decode); - } } diff --git a/js/lite/src/lite/subscribe.ts b/js/lite/src/lite/subscribe.ts index 7f2c0a756..10dbbf4d7 100644 --- a/js/lite/src/lite/subscribe.ts +++ b/js/lite/src/lite/subscribe.ts @@ -1,34 +1,68 @@ import * as Path from "../path.ts"; import type { Reader, Writer } from "../stream.ts"; +import * as Time from "../time.ts"; import * as Message from "./message.ts"; import { Version } from "./version.ts"; export class SubscribeUpdate { priority: number; + ordered: boolean; + maxLatency: Time.Milli; - constructor(priority: number) { + constructor({ priority, maxLatency, ordered }: { priority: number; maxLatency: Time.Milli; ordered: boolean }) { this.priority = priority; + this.maxLatency = maxLatency; + this.ordered = ordered; + } + + async #encode(w: Writer, version: Version) { + switch (version) { + case Version.DRAFT_01: + case Version.DRAFT_02: + await w.u8(this.priority); + break; + case Version.DRAFT_03: + await w.u8(this.priority); + await w.bool(this.ordered); + await w.u53(this.maxLatency); + break; + default: { + const v: never = version; + throw new Error(`unsupported version: ${v}`); + } + } } - async #encode(w: Writer) { - await w.u8(this.priority); - } - - static async #decode(r: Reader): Promise { - const priority = await r.u8(); - return new SubscribeUpdate(priority); - } + static async #decode(r: Reader, version: Version): Promise { + let priority: number; + let maxLatency = Time.Milli.zero; + let ordered = false; + + switch (version) { + case Version.DRAFT_01: + case Version.DRAFT_02: + priority = await r.u8(); + break; + case Version.DRAFT_03: + priority = await r.u8(); + ordered = await r.bool(); + maxLatency = (await r.u53()) as Time.Milli; + break; + default: { + const v: never = version; + throw new Error(`unsupported version: ${v}`); + } + } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + return new SubscribeUpdate({ priority, maxLatency, ordered }); } - static async decode(r: Reader): Promise { - return Message.decode(r, SubscribeUpdate.#decode); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } - static async decodeMaybe(r: Reader): Promise { - return Message.decodeMaybe(r, SubscribeUpdate.#decode); + static async decode(r: Reader, version: Version): Promise { + return Message.decode(r, (r) => SubscribeUpdate.#decode(r, version)); } } @@ -37,78 +71,148 @@ export class Subscribe { broadcast: Path.Valid; track: string; priority: number; - - constructor(id: bigint, broadcast: Path.Valid, track: string, priority: number) { + ordered: boolean; + maxLatency: Time.Milli; + + constructor({ + id, + broadcast, + track, + priority, + ordered, + maxLatency, + }: { + id: bigint; + broadcast: Path.Valid; + track: string; + priority: number; + ordered: boolean; + maxLatency: Time.Milli; + }) { this.id = id; this.broadcast = broadcast; this.track = track; this.priority = priority; + this.ordered = ordered; + this.maxLatency = maxLatency; } - async #encode(w: Writer) { + async #encode(w: Writer, version: Version) { await w.u62(this.id); await w.string(this.broadcast); await w.string(this.track); await w.u8(this.priority); + + switch (version) { + case Version.DRAFT_03: + await w.bool(this.ordered); + await w.u53(this.maxLatency); + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + default: { + const v: never = version; + throw new Error(`unsupported version: ${v}`); + } + } } - static async #decode(r: Reader): Promise { + static async #decode(r: Reader, version: Version): Promise { const id = await r.u62(); const broadcast = Path.from(await r.string()); const track = await r.string(); const priority = await r.u8(); - return new Subscribe(id, broadcast, track, priority); + let maxLatency = Time.Milli.zero; + let ordered = false; + + switch (version) { + case Version.DRAFT_03: + ordered = await r.bool(); + maxLatency = (await r.u53()) as Time.Milli; + break; + case Version.DRAFT_01: + case Version.DRAFT_02: + break; + default: { + const v: never = version; + throw new Error(`unsupported version: ${v}`); + } + } + + return new Subscribe({ id, broadcast, track, priority, maxLatency, ordered }); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, (w) => this.#encode(w, version)); } - static async decode(r: Reader): Promise { - return Message.decode(r, Subscribe.#decode); + static async decode(r: Reader, version: Version): Promise { + return Message.decode(r, (r) => Subscribe.#decode(r, version)); } } export class SubscribeOk { - // The version - readonly version: Version; - priority?: number; + priority: number; + ordered: boolean; + maxLatency: Time.Milli; - constructor({ version, priority = undefined }: { version: Version; priority?: number }) { - this.version = version; + constructor({ priority, maxLatency, ordered }: { priority: number; maxLatency: Time.Milli; ordered: boolean }) { this.priority = priority; - } - - async #encode(w: Writer) { - if (this.version === Version.DRAFT_02) { - // noop - } else if (this.version === Version.DRAFT_01) { - await w.u8(this.priority ?? 0); - } else { - const version: never = this.version; - throw new Error(`unsupported version: ${version}`); + this.maxLatency = maxLatency; + this.ordered = ordered; + } + + async #encode(version: Version, w: Writer) { + switch (version) { + case Version.DRAFT_01: + await w.u8(this.priority); + break; + case Version.DRAFT_02: + break; + case Version.DRAFT_03: + await w.u8(this.priority); + await w.bool(this.ordered); + await w.u53(this.maxLatency); + break; + default: { + const v: never = version; + throw new Error(`unsupported version: ${v}`); + } } } static async #decode(version: Version, r: Reader): Promise { - let priority: number | undefined; - if (version === Version.DRAFT_02) { - // noop - } else if (version === Version.DRAFT_01) { - priority = await r.u8(); - } else { - const v: never = version; - throw new Error(`unsupported version: ${v}`); + let priority = 0; + let ordered = false; + let maxLatency = Time.Milli.zero; + + switch (version) { + case Version.DRAFT_01: + priority = await r.u8(); + break; + case Version.DRAFT_02: + // noop + break; + case Version.DRAFT_03: + priority = await r.u8(); + ordered = await r.bool(); + maxLatency = (await r.u53()) as Time.Milli; + break; + default: { + const v: never = version; + throw new Error(`unsupported version: ${v}`); + } } - return new SubscribeOk({ version, priority }); + return new SubscribeOk({ priority, maxLatency, ordered }); } - async encode(w: Writer): Promise { - return Message.encode(w, this.#encode.bind(this)); + async encode(w: Writer, version: Version): Promise { + return Message.encode(w, this.#encode.bind(this, version)); } static async decode(r: Reader, version: Version): Promise { - return Message.decode(r, SubscribeOk.#decode.bind(SubscribeOk, version)); + return Message.decode(r, (r) => SubscribeOk.#decode(version, r)); } } diff --git a/js/lite/src/lite/subscriber.ts b/js/lite/src/lite/subscriber.ts index 4fe755dad..532606b1b 100644 --- a/js/lite/src/lite/subscriber.ts +++ b/js/lite/src/lite/subscriber.ts @@ -1,15 +1,18 @@ import { Announced } from "../announced.ts"; -import { Broadcast, type TrackRequest } from "../broadcast.ts"; +import { Broadcast } from "../broadcast.ts"; +import { Frame } from "../frame.ts"; import { Group } from "../group.ts"; import * as Path from "../path.ts"; import { type Reader, Stream } from "../stream.ts"; +import * as Time from "../time.ts"; import type { Track } from "../track.ts"; import { error } from "../util/error.ts"; import { Announce, AnnounceInit, AnnounceInterest } from "./announce.ts"; +import { Frame as FrameMessage } from "./frame.ts"; import type { Group as GroupMessage } from "./group.ts"; import { StreamId } from "./stream.ts"; -import { Subscribe, SubscribeOk } from "./subscribe.ts"; -import type { Version } from "./version.ts"; +import { Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; +import { Version } from "./version.js"; /** * Handles subscribing to broadcasts and managing their lifecycle. @@ -67,10 +70,11 @@ export class Subscriber { // Then receive updates for (;;) { - const announce = await Promise.race([Announce.decodeMaybe(stream.reader), announced.closed]); - if (!announce) break; - if (announce instanceof Error) throw announce; + const done = await Promise.any([stream.reader.done(), announced.closed]); + if (done instanceof Error) throw done; + if (done) break; + const announce = await Announce.decode(stream.reader); const path = Path.join(prefix, announce.suffix); console.debug(`announced: broadcast=${path} active=${announce.active}`); @@ -103,38 +107,67 @@ export class Subscriber { return broadcast; } - async #runSubscribe(broadcast: Path.Valid, request: TrackRequest) { + async #runSubscribe(broadcast: Path.Valid, track: Track) { const id = this.#subscribeNext++; // Save the writer so we can append groups to it. - this.#subscribes.set(id, request.track); + this.#subscribes.set(id, track); - console.debug(`subscribe start: id=${id} broadcast=${broadcast} track=${request.track.name}`); + console.debug(`subscribe start: id=${id} broadcast=${broadcast} track=${track.name}`); - const msg = new Subscribe(id, broadcast, request.track.name, request.priority); + const msg = new Subscribe({ + id, + broadcast, + track: track.name, + priority: track.priority.peek(), + maxLatency: track.maxLatency.peek(), + ordered: track.ordered.peek(), + }); const stream = await Stream.open(this.#quic); await stream.writer.u53(StreamId.Subscribe); - await msg.encode(stream.writer); + + await msg.encode(stream.writer, this.version); + + const sendUpdate = async () => { + const msg = new SubscribeUpdate({ + priority: track.priority.peek(), + maxLatency: track.maxLatency.peek(), + ordered: track.ordered.peek(), + }); + await msg.encode(stream.writer, this.version); + }; + + // TODO: There's no backpressure, so this could fail, but I hate javascript. + const dispose = track.priority.subscribe(sendUpdate); + const dispose2 = track.maxLatency.subscribe(sendUpdate); + const dispose3 = track.ordered.subscribe(sendUpdate); try { - await SubscribeOk.decode(stream.reader, this.version); - console.debug(`subscribe ok: id=${id} broadcast=${broadcast} track=${request.track.name}`); + for (;;) { + // TODO do something with the publisher's priority/latency/ordered + await SubscribeOk.decode(stream.reader, this.version); + console.debug(`subscribe ok: id=${id} broadcast=${broadcast} track=${track.name} `); - await Promise.race([stream.reader.closed, request.track.closed]); + const done = await Promise.race([stream.reader.done(), track.closed]); + if (done instanceof Error) throw done; + if (done !== false) break; // false means a new SubscribeOk + } - request.track.close(); + track.close(); stream.close(); - console.debug(`subscribe close: id=${id} broadcast=${broadcast} track=${request.track.name}`); + console.debug(`subscribe close: id=${id} broadcast=${broadcast} track=${track.name}`); } catch (err) { const e = error(err); - request.track.close(e); - console.warn( - `subscribe error: id=${id} broadcast=${broadcast} track=${request.track.name} error=${e.message}`, - ); + track.close(e); + console.warn(`subscribe error: id=${id} broadcast=${broadcast} track=${track.name} error=${e.message}`); stream.abort(e); } finally { this.#subscribes.delete(id); + + dispose(); + dispose2(); + dispose3(); } } @@ -158,16 +191,28 @@ export class Subscriber { const producer = new Group(group.sequence); subscribe.writeGroup(producer); + let instant = Time.Milli.zero; + try { for (;;) { const done = await Promise.race([stream.done(), subscribe.closed, producer.closed]); + if (done instanceof Error) throw done; if (done !== false) break; - const size = await stream.u53(); - const payload = await stream.read(size); - if (!payload) break; + const frame = await FrameMessage.decode(stream, this.version); + + switch (this.version) { + case Version.DRAFT_03: + instant = (instant + frame.delta) as Time.Milli; + break; + case Version.DRAFT_02: + case Version.DRAFT_01: + // Nothing was encoded on the wire, so we use the receive time instead. + instant = Time.Milli.now(); + break; + } - producer.writeFrame(payload); + producer.writeFrame(new Frame({ payload: frame.payload, instant })); } producer.close(); diff --git a/js/lite/src/lite/version.ts b/js/lite/src/lite/version.ts index 9657ee761..200352e20 100644 --- a/js/lite/src/lite/version.ts +++ b/js/lite/src/lite/version.ts @@ -1,6 +1,7 @@ export const Version = { DRAFT_01: 0xff0dad01, DRAFT_02: 0xff0dad02, + DRAFT_03: 0xff0dad03, } as const; export type Version = (typeof Version)[keyof typeof Version]; diff --git a/js/lite/src/stream.ts b/js/lite/src/stream.ts index 9e5d053e5..2a19c02f6 100644 --- a/js/lite/src/stream.ts +++ b/js/lite/src/stream.ts @@ -250,7 +250,8 @@ export class Writer { await this.write(setInt32(this.#scratch, v)); } - async u53(v: number) { + // Returns the number of bytes written + async u53(v: number): Promise { if (v < 0) { throw new Error(`underflow, value is negative: ${v.toString()}`); } @@ -258,10 +259,14 @@ export class Writer { throw new Error(`overflow, value larger than 53-bits: ${v.toString()}`); } - await this.write(setVint53(this.#scratch, v)); + const buffer = setVint53(this.#scratch, v); + + await this.write(buffer); + return buffer.byteLength; } - async u62(v: bigint) { + // Returns the number of bytes written + async u62(v: bigint): Promise { if (v < 0) { throw new Error(`underflow, value is negative: ${v.toString()}`); } @@ -271,17 +276,21 @@ export class Writer { } */ - await this.write(setVint62(this.#scratch, v)); + const buffer = setVint62(this.#scratch, v); + await this.write(buffer); + return buffer.byteLength; } async write(v: Uint8Array) { await this.#writer.write(v); } - async string(str: string) { + // Returns the number of bytes written + async string(str: string): Promise { const data = new TextEncoder().encode(str); - await this.u53(data.byteLength); + const length = await this.u53(data.byteLength); await this.write(data); + return length + data.byteLength; } close() { diff --git a/js/lite/src/track.ts b/js/lite/src/track.ts index 1cd1e3e93..8c3128741 100644 --- a/js/lite/src/track.ts +++ b/js/lite/src/track.ts @@ -1,24 +1,36 @@ import { Signal } from "@moq/signals"; +import type { Frame } from "./frame.ts"; import { Group } from "./group.ts"; +import * as Time from "./time.ts"; -export class TrackState { - groups = new Signal([]); - closed = new Signal(false); +export interface TrackProps { + name: string; + priority?: number | Signal; + maxLatency?: Time.Milli | Signal; + ordered?: boolean; } export class Track { readonly name: string; - state = new TrackState(); - #next?: number; + #groups = new Signal([]); + maxLatency: Signal; + priority: Signal; + ordered: Signal; + #closed = new Signal(false); readonly closed: Promise; - constructor(name: string) { - this.name = name; + #next?: number; + + constructor(props: TrackProps) { + this.name = props.name; + this.priority = Signal.from(props.priority ?? 0); + this.maxLatency = Signal.from(props.maxLatency ?? Time.Milli.zero); + this.ordered = Signal.from(props.ordered ?? false); this.closed = new Promise((resolve) => { - const dispose = this.state.closed.subscribe((closed) => { + const dispose = this.#closed.watch((closed) => { if (!closed) return; resolve(closed instanceof Error ? closed : undefined); dispose(); @@ -31,12 +43,12 @@ export class Track { * @returns A GroupProducer for the new group */ appendGroup(): Group { - if (this.state.closed.peek()) throw new Error("track is closed"); + if (this.#closed.peek()) throw new Error("track is closed"); const group = new Group(this.#next ?? 0); this.#next = group.sequence + 1; - this.state.groups.mutate((groups) => { + this.#groups.mutate((groups) => { groups.push(group); groups.sort((a, b) => a.sequence - b.sequence); }); @@ -49,7 +61,7 @@ export class Track { * @param group - The group to insert */ writeGroup(group: Group) { - if (this.state.closed.peek()) throw new Error("track is closed"); + if (this.#closed.peek()) throw new Error("track is closed"); if (group.sequence < (this.#next ?? 0)) { group.close(); @@ -57,7 +69,7 @@ export class Track { } this.#next = group.sequence + 1; - this.state.groups.mutate((groups) => { + this.#groups.mutate((groups) => { groups.push(group); groups.sort((a, b) => a.sequence - b.sequence); }); @@ -68,61 +80,43 @@ export class Track { * * @param frame - The frame to append */ - writeFrame(frame: Uint8Array) { + writeFrame(frame: Frame) { const group = this.appendGroup(); group.writeFrame(frame); group.close(); } - writeString(str: string) { - const group = this.appendGroup(); - group.writeString(str); - group.close(); - } - - writeJson(json: unknown) { - const group = this.appendGroup(); - group.writeJson(json); - group.close(); - } - - writeBool(bool: boolean) { - const group = this.appendGroup(); - group.writeBool(bool); - group.close(); - } - async nextGroup(): Promise { for (;;) { - const groups = this.state.groups.peek(); + const groups = this.#groups.peek(); if (groups.length > 0) { return groups.shift(); } - const closed = this.state.closed.peek(); + const closed = this.#closed.peek(); if (closed instanceof Error) throw closed; if (closed) return undefined; - await Signal.race(this.state.groups, this.state.closed); + await Signal.race(this.#groups, this.#closed); } } - async readFrame(): Promise { - return (await this.readFrameSequence())?.data; + async readFrame(): Promise { + return (await this.readFrameSequence())?.frame; } // Returns the sequence number of the group and frame, not just the data. - async readFrameSequence(): Promise<{ group: number; frame: number; data: Uint8Array } | undefined> { + async readFrameSequence(): Promise<{ group: number; sequence: number; frame: Frame } | undefined> { for (;;) { - const groups = this.state.groups.peek(); + const groups = this.#groups.peek(); // Discard old groups. while (groups.length > 1) { const frames = groups[0].state.frames.peek(); const next = frames.shift(); if (next) { - const frame = groups[0].state.total.peek() - frames.length - 1; - return { group: groups[0].sequence, frame, data: next }; + const sequence = groups[0].state.total.peek() - frames.length - 1; + return { group: groups[0].sequence, sequence, frame: next }; } // Skip this old group @@ -131,11 +125,11 @@ export class Track { // If there's no groups, wait for a new one. if (groups.length === 0) { - const closed = this.state.closed.peek(); + const closed = this.#closed.peek(); if (closed instanceof Error) throw closed; if (closed) return undefined; - await Signal.race(this.state.groups, this.state.closed); + await Signal.race(this.#groups, this.#closed); continue; } @@ -144,46 +138,27 @@ export class Track { const frames = group.state.frames.peek(); const next = frames.shift(); if (next) { - const frame = group.state.total.peek() - frames.length - 1; - return { group: group.sequence, frame, data: next }; + const sequence = group.state.total.peek() - frames.length - 1; + return { group: group.sequence, sequence, frame: next }; } // If the track is closed, return undefined. - const closed = this.state.closed.peek(); + const closed = this.#closed.peek(); if (closed instanceof Error) throw closed; if (closed) return undefined; // NOTE: We don't care if the latest group was closed or not. - await Signal.race(this.state.groups, this.state.closed, group.state.frames); + await Signal.race(this.#groups, this.#closed, group.state.frames); } } - async readString(): Promise { - const next = await this.readFrame(); - if (!next) return undefined; - return new TextDecoder().decode(next); - } - - async readJson(): Promise { - const next = await this.readString(); - if (!next) return undefined; - return JSON.parse(next); - } - - async readBool(): Promise { - const next = await this.readFrame(); - if (!next) return undefined; - if (next.byteLength !== 1 || !(next[0] === 0 || next[0] === 1)) throw new Error("invalid bool frame"); - return next[0] === 1; - } - /** * Closes the publisher and all associated groups. */ close(abort?: Error) { - this.state.closed.set(abort ?? true); + this.#closed.set(abort ?? true); - for (const group of this.state.groups.peek()) { + for (const group of this.#groups.peek()) { group.close(abort); } } diff --git a/js/lite/src/zod.ts b/js/lite/src/zod.ts index 936cd1ab0..49c6fc452 100644 --- a/js/lite/src/zod.ts +++ b/js/lite/src/zod.ts @@ -1,16 +1,17 @@ // Helper containers for Zod-validated track encoding/decoding. import type * as z from "zod"; +import { Frame } from "./frame.ts"; import type { Group } from "./group.ts"; import type { Track } from "./track.ts"; export async function read(source: Track | Group, schema: z.ZodSchema): Promise { - const next = await source.readJson(); - if (next === undefined) return undefined; // only treat undefined as EOF, not other falsy values - return schema.parse(next); + const next = await source.readFrame(); + if (next === undefined) return; // only treat undefined as EOF, not other falsy values + return schema.parse(next.toJson()); } export function write(source: Track | Group, value: T, schema: z.ZodSchema) { const valid = schema.parse(value); - source.writeJson(valid); + source.writeFrame(Frame.fromJson(valid)); } diff --git a/js/signals/src/index.ts b/js/signals/src/index.ts index 467b884e6..47da348c4 100644 --- a/js/signals/src/index.ts +++ b/js/signals/src/index.ts @@ -153,6 +153,16 @@ export class Signal implements Getter, Setter { for (const fn of dispose) fn(); return result; } + + // Spawn a promise that resolves when the signal changes. + promise(): Promise { + return new Promise((resolve) => { + const dispose = this.changed((value) => { + resolve(value); + dispose(); + }); + }); + } } type SetterType = S extends Setter ? T : never; diff --git a/justfile b/justfile index 391f3c9e4..7125a0fc1 100644 --- a/justfile +++ b/justfile @@ -321,7 +321,7 @@ test: bun run --filter='*' test fi - cargo test --all-targets --all-features + RUST_BACKTRACE=1 cargo test --all-targets --all-features # Automatically fix some issues. fix: diff --git a/rs/hang-cli/src/publish.rs b/rs/hang-cli/src/publish.rs index eb12d8771..14b4e506c 100644 --- a/rs/hang-cli/src/publish.rs +++ b/rs/hang-cli/src/publish.rs @@ -2,8 +2,7 @@ use bytes::BytesMut; use clap::Subcommand; use hang::{ import::{Decoder, DecoderFormat}, - moq_lite::BroadcastConsumer, - BroadcastProducer, + moq_lite, }; use tokio::io::AsyncReadExt; @@ -26,28 +25,30 @@ enum PublishDecoder { pub struct Publish { decoder: PublishDecoder, - broadcast: BroadcastProducer, + broadcast: moq_lite::BroadcastProducer, buffer: BytesMut, } impl Publish { pub fn new(format: &PublishFormat) -> anyhow::Result { - let broadcast = BroadcastProducer::default(); + let broadcast = moq_lite::BroadcastProducer::default(); + let catalog = hang::CatalogProducer::new(broadcast.clone())?; let decoder = match format { PublishFormat::Avc3 => { let format = DecoderFormat::Avc3; - let stream = Decoder::new(broadcast.clone(), format); + let stream = Decoder::new(broadcast.clone(), catalog, format); PublishDecoder::Decoder(Box::new(stream)) } PublishFormat::Fmp4 => { let format = DecoderFormat::Fmp4; - let stream = Decoder::new(broadcast.clone(), format); + let stream = Decoder::new(broadcast.clone(), catalog, format); PublishDecoder::Decoder(Box::new(stream)) } PublishFormat::Hls { playlist } => { let hls = hang::import::Hls::new( broadcast.clone(), + catalog, hang::import::HlsConfig { playlist: playlist.clone(), client: None, @@ -64,7 +65,7 @@ impl Publish { }) } - pub fn consume(&self) -> BroadcastConsumer { + pub fn consume(&self) -> moq_lite::BroadcastConsumer { self.broadcast.consume() } } diff --git a/rs/hang/Cargo.toml b/rs/hang/Cargo.toml index 797426ec1..3c5cd4980 100644 --- a/rs/hang/Cargo.toml +++ b/rs/hang/Cargo.toml @@ -16,7 +16,6 @@ categories = ["multimedia", "network-programming", "web-programming"] anyhow = "1" buf-list = "1" bytes = "1" -futures = "0.3" h264-parser = "0.4.0" hex = "0.4" lazy_static = "1" diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index e86a47ebb..fc8eb4b36 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -1,3 +1,4 @@ +use anyhow::Context; // cargo run --example video use moq_lite::coding::Bytes; @@ -12,17 +13,6 @@ async fn main() -> anyhow::Result<()> { // Create an origin that we can publish to and the session can consume from. let origin = moq_lite::Origin::produce(); - // Run the broadcast production and the session in parallel. - // This is a simple example of how you can concurrently run multiple tasks. - // tokio::spawn works too. - tokio::select! { - res = run_broadcast(origin.producer) => res, - res = run_session(origin.consumer) => res, - } -} - -// Connect to the server and publish our origin of broadcasts. -async fn run_session(origin: moq_lite::OriginConsumer) -> anyhow::Result<()> { // Optional: Use moq_native to make a QUIC client. let client = moq_native::ClientConfig::default().init()?; @@ -33,25 +23,23 @@ async fn run_session(origin: moq_lite::OriginConsumer) -> anyhow::Result<()> { // Establish a WebTransport/QUIC connection and MoQ handshake. // None means we're not consuming anything from the session, otherwise we would provide an OriginProducer. // Optional: Use connect_with_fallback if you also want to support WebSocket. - let session = client.connect(url, origin, None).await?; + let session = client.connect(url, origin.consumer, None).await?; - // Wait until the session is closed. - session.closed().await.map_err(Into::into) -} + // NOTE: The path is empty because we're using the URL to scope the broadcast. + // OPTIONAL: We publish after inserting the tracks just to avoid a nearly impossible race condition. + let mut broadcast = origin.producer.create_broadcast("").context("not allowed to publish")?; -// Create a video track with a catalog that describes it. -// The catalog can contain multiple tracks, used by the viewer to choose the best track. -fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProducer { - // Basic information about the video track. - let video_track = moq_lite::Track { - name: "video".to_string(), - priority: 1, // Video typically has lower priority than audio - }; + // We also need a catalog to describe our tracks. + // NOTE: You would reuse this for all tracks; we're creating a new one here for simplicity. + let mut catalog = hang::CatalogProducer::new(broadcast.clone())?; + + // Once we unlock (drop) the catalog, it will be published to the broadcast. + let mut catalog = catalog.lock(); // Example video configuration // In a real application, you would get this from the encoder - let video_config = hang::catalog::VideoConfig { - codec: hang::catalog::H264 { + let config = hang::VideoConfig { + codec: hang::H264 { profile: 0x4D, // Main profile constraints: 0, level: 0x28, // Level 4.0 @@ -71,75 +59,61 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu optimize_for_latency: None, }; - // Create a map of video renditions - // Multiple renditions allow the viewer to choose based on their capabilities - let mut renditions = std::collections::BTreeMap::new(); - renditions.insert(video_track.name.clone(), video_config); + // This is a helper that creates a unique track name and adds it to the catalog. + // You can also set `catalog.video` fields directly. + let track = catalog.video.create("example", config); - // Create the video catalog entry with the renditions - let video = hang::catalog::Video { - renditions, + // We also need some details on how to deliver the track over the network. + let delivery = moq_lite::Delivery { + // Video typically has lower priority than audio; we'll try to transmit it later priority: 1, - display: None, - rotation: None, - flip: None, + // You can configure the amount of time to keep old groups in cache. + max_latency: moq_lite::Time::from_secs(10).unwrap(), + // You can even tell the CDN if it should try to delver in group order. + ordered: false, }; - // Create a producer/consumer pair for the catalog. - // This JSON encodes the catalog as a "catalog.json" track. - let catalog = hang::catalog::Catalog { - video: Some(video), - ..Default::default() - } - .produce(); - - // Publish the catalog track to the broadcast. - broadcast.insert_track(catalog.consumer.track); - - // Actually create the media track now. - let track = broadcast.create_track(video_track); + let mut track = broadcast.create_track(track, delivery)?; - // Wrap the track in a hang:TrackProducer for convenience methods. - track.into() -} - -// Produce a broadcast and publish it to the origin. -async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> { - // Create and publish a broadcast to the origin. - let mut broadcast = moq_lite::Broadcast::produce(); - let mut track = create_track(&mut broadcast.producer); - - // NOTE: The path is empty because we're using the URL to scope the broadcast. - // OPTIONAL: We publish after inserting the tracks just to avoid a nearly impossible race condition. - origin.publish_broadcast("", broadcast.consumer); + // Create a group of frames. + // Each group must start with a keyframe. + let mut group = track.append_group()?; - // Not real frames of course. - track.write(hang::Frame { - keyframe: true, + // Encode a simple container that consists of a timestamp and a payload. + // NOTE: This will be removed in the future; it's for backwards compatibility. + hang::Container { timestamp: hang::Timestamp::from_secs(1).unwrap(), payload: Bytes::from_static(b"keyframe NAL data").into(), - })?; + } + .encode(&mut group)?; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - track.write(hang::Frame { - keyframe: false, + hang::Container { timestamp: hang::Timestamp::from_secs(2).unwrap(), payload: Bytes::from_static(b"delta NAL data").into(), - })?; + } + .encode(&mut group)?; tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - // Automatically creates a new group if you write a new keyframe. + // You can create a new group for each keyframe. + group.close()?; + let mut group = track.append_group()?; - track.write(hang::Frame { - keyframe: true, + hang::Container { timestamp: hang::Timestamp::from_secs(3).unwrap(), payload: Bytes::from_static(b"keyframe NAL data").into(), - })?; + } + .encode(&mut group)?; + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // You can also abort a group if you want to abandon delivery immediately. + group.abort(moq_lite::Error::Expired)?; - // Sleep before exiting and closing the broadcast. - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + // Wait until the session is closed, I guess + session.closed().await?; Ok(()) } diff --git a/rs/hang/src/catalog/audio/mod.rs b/rs/hang/src/catalog/audio/mod.rs index 7322bc307..5aa5256a9 100644 --- a/rs/hang/src/catalog/audio/mod.rs +++ b/rs/hang/src/catalog/audio/mod.rs @@ -4,12 +4,11 @@ mod codec; pub use aac::*; pub use codec::*; -use std::collections::BTreeMap; - use bytes::Bytes; - +use moq_lite::Track; use serde::{Deserialize, Serialize}; use serde_with::{hex::Hex, DisplayFromStr}; +use std::collections::{btree_map, BTreeMap}; /// Information about an audio track in the catalog. /// @@ -22,12 +21,43 @@ pub struct Audio { /// A map of track name to rendition configuration. /// This is not an array so it will work with JSON Merge Patch. /// We use a BTreeMap so keys are sorted alphabetically for *some* deterministic behavior. - pub renditions: BTreeMap, + pub renditions: BTreeMap, /// The priority of the audio track, relative to other tracks in the broadcast. + /// + /// TODO: Remove this; it's for backwards compatibility only + #[serde(default)] pub priority: u8, } +impl Audio { + // Don't serialize if there are no renditions. + pub fn is_empty(&self) -> bool { + self.renditions.is_empty() + } + + /// Create a new audio track with a configuration and generate a unique name. + pub fn create(&mut self, name: &str, config: AudioConfig) -> Track { + let mut index = 0; + + loop { + let track = Track::from(format!("audio:{}:{}", name, index)); + match self.renditions.entry(track.clone()) { + btree_map::Entry::Vacant(entry) => { + entry.insert(config); + return track; + } + btree_map::Entry::Occupied(_) => index += 1, + } + } + } + + /// Remove a audio track from the catalog. + pub fn remove(&mut self, track: &Track) -> Option { + self.renditions.remove(track) + } +} + /// Audio decoder configuration based on WebCodecs AudioDecoderConfig. /// /// This struct contains all the information needed to initialize an audio decoder, diff --git a/rs/hang/src/catalog/mod.rs b/rs/hang/src/catalog/mod.rs index ea48eee01..0677a8959 100644 --- a/rs/hang/src/catalog/mod.rs +++ b/rs/hang/src/catalog/mod.rs @@ -7,6 +7,7 @@ mod audio; mod chat; mod preview; +mod produce; mod root; mod user; mod video; @@ -14,6 +15,7 @@ mod video; pub use audio::*; pub use chat::*; pub use preview::*; +pub use produce::*; pub use root::*; pub use user::*; pub use video::*; diff --git a/rs/hang/src/catalog/produce.rs b/rs/hang/src/catalog/produce.rs new file mode 100644 index 000000000..3c659afd0 --- /dev/null +++ b/rs/hang/src/catalog/produce.rs @@ -0,0 +1,115 @@ +use crate::Catalog; +use crate::Error; + +use std::{ + ops::{Deref, DerefMut}, + sync::{Arc, Mutex, MutexGuard}, +}; + +/// Produces a catalog track that describes the available media tracks. +/// +/// Use [Self::lock] to update the catalog, publishing any changes on [CatalogGuard::drop]. +#[derive(Debug, Clone)] +pub struct CatalogProducer { + track: moq_lite::TrackProducer, + current: Arc>, +} + +impl CatalogProducer { + /// Create a new catalog producer for the given broadcast. + pub fn new(mut broadcast: moq_lite::BroadcastProducer) -> Result { + let track = broadcast.create_track(Catalog::default_track(), Catalog::default_delivery())?; + Ok(Self { + current: Arc::new(Mutex::new(Catalog::default())), + track, + }) + } + + /// Get mutable access to the catalog, publishing it after any changes. + pub fn lock(&mut self) -> CatalogGuard<'_> { + CatalogGuard { + catalog: self.current.lock().unwrap(), + track: &mut self.track, + } + } + + /// Finish publishing to this catalog and close the track. + pub fn close(mut self) -> Result<(), Error> { + self.track.close()?; + Ok(()) + } +} + +pub struct CatalogGuard<'a> { + catalog: MutexGuard<'a, Catalog>, + track: &'a mut moq_lite::TrackProducer, +} + +impl<'a> Deref for CatalogGuard<'a> { + type Target = Catalog; + + fn deref(&self) -> &Self::Target { + &self.catalog + } +} + +impl<'a> DerefMut for CatalogGuard<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.catalog + } +} + +impl Drop for CatalogGuard<'_> { + fn drop(&mut self) { + if let Ok(mut group) = self.track.append_group() { + // TODO decide if this should return an error, or be impossible to fail + let frame = self.catalog.to_string().expect("invalid catalog"); + group.write_frame(frame, moq_lite::Time::now()).ok(); + group.close().ok(); + } + } +} + +/// Consumes the catalog track, returning the next catalog update. +pub struct CatalogConsumer { + broadcast: Option, + track: Option, + group: Option, +} + +impl CatalogConsumer { + /// Create a new catalog consumer from a broadcast. + pub fn new(broadcast: moq_lite::BroadcastConsumer) -> Self { + Self { + broadcast: Some(broadcast), + track: None, + group: None, + } + } + + /// Get the next catalog update. + /// + /// This method waits for the next catalog publication and returns the + /// catalog data. If there are no more updates, `None` is returned. + pub async fn next(&mut self) -> Result, Error> { + if let Some(broadcast) = &mut self.broadcast { + let track = broadcast.subscribe_track(Catalog::default_track(), Catalog::default_delivery())?; + self.track = Some(track); + self.broadcast = None; + } + + loop { + tokio::select! { + biased; + Some(track) = async { self.track.as_mut()?.next_group().await.transpose() } => { + self.group = Some(track?); + } + Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => { + let catalog = Catalog::from_slice(&frame?)?; + return Ok(Some(catalog)); + } + else => return Ok(None), + } + } + } +} diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index 1f2e80b3d..d9a40acd8 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -1,13 +1,10 @@ -//! This module contains the structs and functions for the MoQ catalog format -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, Mutex, MutexGuard}; - -/// The catalog format is a JSON file that describes the tracks available in a broadcast. +//! This module contains the structs and functions for the MoQ catalog format. +//! +//! The catalog format is a JSON file that describes the tracks available in a broadcast. use serde::{Deserialize, Serialize}; -use crate::catalog::{Audio, AudioConfig, Chat, User, Video, VideoConfig}; use crate::Result; -use moq_lite::Produce; +use crate::{Audio, Chat, User, Video}; /// A catalog track, created by a broadcaster to describe the tracks available in a broadcast. #[serde_with::serde_as] @@ -19,15 +16,15 @@ pub struct Catalog { /// /// Contains a map of video track renditions that the viewer can choose from /// based on their preferences (resolution, bitrate, codec, etc). - #[serde(default)] - pub video: Option