Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crates/perry-runtime/src/node_stream_constructors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,21 @@ fn fallback_node_writable_to_web(node_stream: f64) -> f64 {
])
}

pub(crate) fn js_node_stream_readable_to_web_method_value(node_stream: f64) -> f64 {
fallback_node_readable_to_web(node_stream)
}

pub(crate) fn js_node_stream_writable_to_web_method_value(node_stream: f64) -> f64 {
fallback_node_writable_to_web(node_stream)
}

pub(crate) fn js_node_stream_duplex_to_web_method_value(node_stream: f64) -> f64 {
web_pair_object(
fallback_node_readable_to_web(node_stream),
fallback_node_writable_to_web(node_stream),
)
}

fn web_pair_object(readable: f64, writable: f64) -> f64 {
build_enumerable_object(&[(b"readable", readable), (b"writable", writable)])
}
Expand Down
172 changes: 172 additions & 0 deletions crates/perry-runtime/src/object/native_module_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ thread_local! {
static STREAM_EVENT_EMITTER_PROTOTYPES: RefCell<Vec<u64>> = const { RefCell::new(Vec::new()) };
}

const STREAM_STATIC_READABLE_FROM: f64 = 1.0;
const STREAM_STATIC_DUPLEX_FROM: f64 = 2.0;
const STREAM_STATIC_READABLE_TO_WEB: f64 = 3.0;
const STREAM_STATIC_WRITABLE_TO_WEB: f64 = 4.0;
const STREAM_STATIC_DUPLEX_TO_WEB: f64 = 5.0;
const STREAM_STATIC_READABLE_FROM_WEB: f64 = 6.0;
const STREAM_STATIC_WRITABLE_FROM_WEB: f64 = 7.0;
const STREAM_STATIC_DUPLEX_FROM_WEB: f64 = 8.0;
const STREAM_STATIC_IS_DISTURBED: f64 = 9.0;
const STREAM_STATIC_IS_ERRORED: f64 = 10.0;

pub(crate) fn scan_stream_event_emitter_prototype_roots_mut(
visitor: &mut crate::gc::RuntimeRootVisitor<'_>,
) {
Expand Down Expand Up @@ -80,6 +91,7 @@ pub(crate) fn attach_stream_constructor_prototype(constructor_value: f64, name:
"prototype",
proto_value,
);
attach_stream_constructor_statics(constructor_value, name);
}

pub(crate) fn is_stream_event_emitter_prototype_value(value: f64) -> bool {
Expand All @@ -91,6 +103,80 @@ pub(crate) fn is_stream_event_emitter_prototype_value(value: f64) -> bool {
STREAM_EVENT_EMITTER_PROTOTYPES.with(|protos| protos.borrow().contains(&bits))
}

extern "C" fn stream_static_method_thunk(
closure: *const crate::closure::ClosureHeader,
arg0: f64,
arg1: f64,
) -> f64 {
let kind = crate::closure::js_closure_get_capture_f64(closure, 0);
if kind == STREAM_STATIC_READABLE_FROM {
crate::node_stream::js_node_stream_readable_from_options(arg0, arg1)
} else if kind == STREAM_STATIC_DUPLEX_FROM {
crate::node_stream::js_node_stream_duplex_from_options(arg0, arg1)
} else if kind == STREAM_STATIC_READABLE_TO_WEB {
crate::node_stream::js_node_stream_readable_to_web_method_value(arg0)
} else if kind == STREAM_STATIC_WRITABLE_TO_WEB {
crate::node_stream::js_node_stream_writable_to_web_method_value(arg0)
} else if kind == STREAM_STATIC_DUPLEX_TO_WEB {
crate::node_stream::js_node_stream_duplex_to_web_method_value(arg0)
} else if kind == STREAM_STATIC_READABLE_FROM_WEB {
crate::node_stream::js_node_stream_readable_from_web(arg0, arg1)
} else if kind == STREAM_STATIC_WRITABLE_FROM_WEB {
crate::node_stream::js_node_stream_writable_from_web(arg0, arg1)
} else if kind == STREAM_STATIC_DUPLEX_FROM_WEB {
crate::node_stream::js_node_stream_duplex_from_web(arg0, arg1)
} else if kind == STREAM_STATIC_IS_DISTURBED {
crate::node_stream::js_node_stream_is_disturbed(arg0)
} else if kind == STREAM_STATIC_IS_ERRORED {
crate::node_stream::js_node_stream_is_errored(arg0)
} else {
f64::from_bits(crate::value::TAG_UNDEFINED)
}
}

fn stream_static_method_value(method: &str, kind: f64, exposed_length: u32) -> f64 {
let func_ptr = stream_static_method_thunk as *const u8;
crate::closure::js_register_closure_arity(func_ptr, 2);
let closure = crate::closure::js_closure_alloc(func_ptr, 1);
crate::closure::js_closure_set_capture_f64(closure, 0, kind);
set_bound_native_closure_name(closure, method);
set_builtin_closure_length(closure as usize, exposed_length);
crate::value::js_nanbox_pointer(closure as i64)
}

fn attach_stream_static(closure: usize, method: &str, kind: f64, exposed_length: u32) {
let value = stream_static_method_value(method, kind, exposed_length);
crate::closure::closure_set_dynamic_prop(closure, method, value);
}

fn attach_stream_constructor_statics(constructor_value: f64, name: &str) {
let closure = (constructor_value.to_bits() & crate::value::POINTER_MASK) as usize;
if closure == 0 {
return;
}

match name {
"Readable" => {
attach_stream_static(closure, "from", STREAM_STATIC_READABLE_FROM, 2);
attach_stream_static(closure, "fromWeb", STREAM_STATIC_READABLE_FROM_WEB, 2);
attach_stream_static(closure, "toWeb", STREAM_STATIC_READABLE_TO_WEB, 2);
}
"Writable" => {
attach_stream_static(closure, "fromWeb", STREAM_STATIC_WRITABLE_FROM_WEB, 2);
attach_stream_static(closure, "toWeb", STREAM_STATIC_WRITABLE_TO_WEB, 1);
}
"Duplex" | "Transform" | "PassThrough" => {
attach_stream_static(closure, "from", STREAM_STATIC_DUPLEX_FROM, 1);
attach_stream_static(closure, "fromWeb", STREAM_STATIC_DUPLEX_FROM_WEB, 2);
attach_stream_static(closure, "toWeb", STREAM_STATIC_DUPLEX_TO_WEB, 2);
}
_ => {}
}

attach_stream_static(closure, "isDisturbed", STREAM_STATIC_IS_DISTURBED, 1);
attach_stream_static(closure, "isErrored", STREAM_STATIC_IS_ERRORED, 1);
}

pub(crate) unsafe fn dispatch_stream_native_module_method(
method_name: &str,
args_ptr: *const f64,
Expand Down Expand Up @@ -142,6 +228,34 @@ pub(crate) unsafe fn dispatch_stream_native_module_method(
mod tests {
use super::*;

fn closure_addr(value: f64) -> usize {
(value.to_bits() & crate::value::POINTER_MASK) as usize
}

fn assert_static_method(constructor: f64, name: &str, length: u32) {
let ctor_ptr = closure_addr(constructor);
let method = crate::closure::closure_get_dynamic_prop(ctor_ptr, name);
let method_ptr = closure_addr(method);
assert_ne!(method.to_bits(), crate::value::TAG_UNDEFINED);
assert_ne!(method_ptr, 0);
assert_eq!(builtin_closure_length(method_ptr), Some(length));
}

fn static_method_value(constructor: f64, name: &str) -> f64 {
let ctor_ptr = closure_addr(constructor);
crate::closure::closure_get_dynamic_prop(ctor_ptr, name)
}

unsafe fn property(value: f64, name: &[u8]) -> f64 {
crate::value::js_get_property(value, name.as_ptr() as i64, name.len() as i64)
}

fn assert_object_property_function(value: f64, name: &[u8]) {
let prop = unsafe { property(value, name) };
assert_ne!(prop.to_bits(), crate::value::TAG_UNDEFINED);
assert_ne!(closure_addr(prop), 0);
}

#[test]
fn legacy_stream_prototype_is_event_emitter_instanceof_candidate() {
let stream_ctor = bound_native_callable_export_value("stream", "Stream");
Expand All @@ -159,4 +273,62 @@ mod tests {
crate::value::TAG_TRUE,
);
}

#[test]
fn stream_constructors_expose_static_method_values() {
let readable = bound_native_callable_export_value("stream", "Readable");
let writable = bound_native_callable_export_value("stream", "Writable");
let duplex = bound_native_callable_export_value("stream", "Duplex");
let transform = bound_native_callable_export_value("stream", "Transform");
let passthrough = bound_native_callable_export_value("stream", "PassThrough");

assert_static_method(readable, "from", 2);
assert_static_method(readable, "fromWeb", 2);
assert_static_method(readable, "toWeb", 2);
assert_static_method(readable, "isDisturbed", 1);
assert_static_method(readable, "isErrored", 1);

assert_static_method(writable, "fromWeb", 2);
assert_static_method(writable, "toWeb", 1);
assert_static_method(writable, "isDisturbed", 1);
assert_static_method(writable, "isErrored", 1);

for constructor in [duplex, transform, passthrough] {
assert_static_method(constructor, "from", 1);
assert_static_method(constructor, "fromWeb", 2);
assert_static_method(constructor, "toWeb", 2);
assert_static_method(constructor, "isDisturbed", 1);
assert_static_method(constructor, "isErrored", 1);
}

let node_readable = crate::node_stream::js_node_stream_readable_new(f64::from_bits(
crate::value::TAG_UNDEFINED,
));
let readable_to_web = static_method_value(readable, "toWeb");
let web_readable = unsafe {
crate::closure::js_native_call_value(readable_to_web, [node_readable].as_ptr(), 1)
};
assert_object_property_function(web_readable, b"getReader");

let node_writable = crate::node_stream::js_node_stream_writable_new(f64::from_bits(
crate::value::TAG_UNDEFINED,
));
let writable_to_web = static_method_value(writable, "toWeb");
let web_writable = unsafe {
crate::closure::js_native_call_value(writable_to_web, [node_writable].as_ptr(), 1)
};
assert_object_property_function(web_writable, b"getWriter");

let node_duplex = crate::node_stream::js_node_stream_duplex_new(f64::from_bits(
crate::value::TAG_UNDEFINED,
));
let duplex_to_web = static_method_value(duplex, "toWeb");
let web_pair = unsafe {
crate::closure::js_native_call_value(duplex_to_web, [node_duplex].as_ptr(), 1)
};
let web_pair_readable = unsafe { property(web_pair, b"readable") };
let web_pair_writable = unsafe { property(web_pair, b"writable") };
assert_object_property_function(web_pair_readable, b"getReader");
assert_object_property_function(web_pair_writable, b"getWriter");
}
}
81 changes: 81 additions & 0 deletions test-parity/node-suite/stream/static/class-static-exports.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import stream, { Duplex, PassThrough, Readable, Transform, Writable } from "node:stream";
import { ReadableStream, WritableStream } from "node:stream/web";

const checks = [
["Readable.from", Readable.from, 2],
["Readable.fromWeb", Readable.fromWeb, 2],
["Readable.toWeb", Readable.toWeb, 2],
["Readable.isDisturbed", Readable.isDisturbed, 1],
["Readable.isErrored", Readable.isErrored, 1],
["Writable.fromWeb", Writable.fromWeb, 2],
["Writable.toWeb", Writable.toWeb, 1],
["Writable.isDisturbed", Writable.isDisturbed, 1],
["Writable.isErrored", Writable.isErrored, 1],
["Duplex.from", Duplex.from, 1],
["Duplex.fromWeb", Duplex.fromWeb, 2],
["Duplex.toWeb", Duplex.toWeb, 2],
["Transform.from", Transform.from, 1],
["Transform.fromWeb", Transform.fromWeb, 2],
["Transform.toWeb", Transform.toWeb, 2],
["PassThrough.from", PassThrough.from, 1],
["PassThrough.fromWeb", PassThrough.fromWeb, 2],
["PassThrough.toWeb", PassThrough.toWeb, 2],
] as const;

for (const [name, fn, length] of checks) {
console.log("static:", name, typeof fn, fn.length === length);
}

console.log("namespace identity:", stream.Readable.from === Readable.from);

const readableFrom = Readable.from;
const readable = readableFrom(["a", "b"]);
console.log("detached Readable.from:", typeof readable, readable.readable, readable.destroyed);
console.log("detached Readable.from values:", (await readable.toArray()).join(","));

const duplexFrom = Duplex.from;
const duplex = duplexFrom(Readable.from(["x"]));
console.log("detached Duplex.from:", typeof duplex, duplex.readable, duplex.writable);
duplex.on("error", () => {});
duplex.destroy();

const readableToWeb = Readable.toWeb;
const readableWeb = readableToWeb(Readable.from(["r"]));
console.log("detached Readable.toWeb:", typeof readableWeb.getReader);

const writableToWeb = Writable.toWeb;
const writableWeb = writableToWeb(new Writable({ write(_chunk, _enc, cb) { cb(); } }));
console.log("detached Writable.toWeb:", typeof writableWeb.getWriter);

const duplexToWeb = Duplex.toWeb;
const pair = duplexToWeb(Duplex.from(Readable.from(["d"])));
console.log(
"detached Duplex.toWeb:",
typeof pair.readable.getReader,
typeof pair.writable.getWriter,
);

const fromWebReadable = Readable.fromWeb;
const readableFromWeb = fromWebReadable(new ReadableStream({
start(controller) {
controller.enqueue("w");
controller.close();
},
}));
console.log("detached Readable.fromWeb:", typeof readableFromWeb, readableFromWeb.readable);
readableFromWeb.on("error", () => {});
readableFromWeb.destroy();

const fromWebWritable = Writable.fromWeb;
const writableFromWeb = fromWebWritable(new WritableStream({
write() {},
}));
console.log("detached Writable.fromWeb:", typeof writableFromWeb, writableFromWeb.writable);
writableFromWeb.on("error", () => {});
writableFromWeb.destroy();

console.log(
"static state helpers:",
Readable.isDisturbed(Readable.from([])),
Readable.isErrored(Readable.from([])),
);