diff --git a/Cargo.lock b/Cargo.lock index ff1de4c43..b96a9c054 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,7 +45,6 @@ dependencies = [ "evm-rpc-datasets", "firehose-datasets", "futures", - "js-runtime", "metadata-db", "monitoring", "serde", diff --git a/crates/core/common/src/amp_catalog_provider.rs b/crates/core/common/src/amp_catalog_provider.rs index 1f5198d8e..f8a690777 100644 --- a/crates/core/common/src/amp_catalog_provider.rs +++ b/crates/core/common/src/amp_catalog_provider.rs @@ -18,7 +18,6 @@ use datasets_common::{ hash_reference::HashReference, partial_reference::PartialReference, reference::Reference, }; use datasets_derived::deps::SELF_REF_KEYWORD; -use js_runtime::isolate_pool::IsolatePool; use crate::{ dataset_schema_provider::DatasetSchemaProvider, @@ -53,7 +52,6 @@ pub const AMP_CATALOG_NAME: &str = "amp"; pub struct AmpCatalogProvider { datasets_cache: DatasetsCache, ethcall_udfs_cache: EthCallUdfsCache, - isolate_pool: IsolatePool, /// Optional dependency alias overrides. When set, bare names matching /// a key are resolved directly to the corresponding [`HashReference`] /// instead of going through `PartialReference` → `Reference` → `resolve_revision`. @@ -65,15 +63,10 @@ pub struct AmpCatalogProvider { impl AmpCatalogProvider { /// Creates a new catalog provider. - pub fn new( - datasets_cache: DatasetsCache, - ethcall_udfs_cache: EthCallUdfsCache, - isolate_pool: IsolatePool, - ) -> Self { + pub fn new(datasets_cache: DatasetsCache, ethcall_udfs_cache: EthCallUdfsCache) -> Self { Self { datasets_cache, ethcall_udfs_cache, - isolate_pool, dep_aliases: Default::default(), self_schema: None, } @@ -127,7 +120,6 @@ impl AmpCatalogProvider { name.to_string(), dataset, self.ethcall_udfs_cache.clone(), - self.isolate_pool.clone(), )); return Ok(Some(provider)); } @@ -158,7 +150,6 @@ impl AmpCatalogProvider { name.to_string(), dataset, self.ethcall_udfs_cache.clone(), - self.isolate_pool.clone(), )); Ok(Some(provider)) } diff --git a/crates/core/common/src/context/exec.rs b/crates/core/common/src/context/exec.rs index 299acf7e2..2154dac95 100644 --- a/crates/core/common/src/context/exec.rs +++ b/crates/core/common/src/context/exec.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, pin::Pin, sync::{Arc, LazyLock}, task::{Context, Poll}, @@ -11,7 +11,7 @@ use datafusion::{ self, arrow::array::RecordBatch, catalog::{AsyncCatalogProvider as TableAsyncCatalogProvider, MemorySchemaProvider}, - common::tree_node::Transformed, + common::tree_node::{Transformed, TreeNode}, datasource::{DefaultTableSource, TableType}, error::DataFusionError, execution::{ @@ -22,9 +22,10 @@ use datafusion::{ memory_pool::{MemoryPool, human_readable_size}, object_store::ObjectStoreRegistry, }, - logical_expr::{LogicalPlan, TableScan}, + logical_expr::{LogicalPlan, ScalarUDF, TableScan, expr::ScalarFunction}, physical_optimizer::PhysicalOptimizerRule, physical_plan::{ExecutionPlan, displayable, execute_stream, stream::RecordBatchStreamAdapter}, + prelude::Expr, sql::parser, }; use datafusion_tracing::{ @@ -58,15 +59,25 @@ use crate::{ forbid_underscore_prefixed_aliases, }, sql::{TableReference, TableReferenceConversionError}, + udfs::plan::PlanJsUdf, }; /// A context for executing queries against a catalog. +/// +/// Holds everything needed to plan and execute a single query: the catalog +/// snapshot, JavaScript UDF runtime, memory budget, and the DataFusion session +/// that ties them together. #[derive(Clone)] pub struct ExecContext { + /// Shared execution environment (network, block range, data store, etc.). pub env: ExecEnv, + /// Pool of V8 isolates used to run JavaScript UDFs during execution. + isolate_pool: IsolatePool, + /// Point-in-time snapshot of the physical catalog (segments and tables). physical_table: CatalogSnapshot, + /// Queryable views derived from `physical_table`, one per registered table. query_snapshots: Vec>, - /// Per-query memory pool (if per-query limits are enabled) + /// Per-query memory pool (if per-query limits are enabled). tiered_memory_pool: Arc, /// Custom session context that owns the runtime environment and optimizer /// rules. All session creation goes through `session_ctx.state()` @@ -75,13 +86,29 @@ pub struct ExecContext { } impl ExecContext { - /// Attaches a detached logical plan to this query context by replacing - /// `PlanTable` table sources in `TableScan` nodes with actual - /// `QueryableSnapshot` providers from the catalog. + /// Returns the isolate pool for JavaScript UDF execution. + pub fn isolate_pool(&self) -> &IsolatePool { + &self.isolate_pool + } + + /// Attaches a detached logical plan to this query context in a single + /// traversal that, for each plan node: + /// + /// 1. Replaces `PlanTable` table sources in `TableScan` nodes with actual + /// `QueryableSnapshot` providers from the catalog. + /// 2. Rewrites `PlanJsUdf`-backed scalar functions inside the node's + /// expression tree into execution-ready UDFs by attaching the + /// `IsolatePool`. #[tracing::instrument(skip_all, err)] pub(crate) fn attach(&self, plan: LogicalPlan) -> Result { - plan.transform_with_subqueries(|node| attach_table_node(node, self)) - .map(|t| t.data) + let mut cache: HashMap> = HashMap::new(); + let pool = self.isolate_pool.clone(); + + plan.transform_with_subqueries(|node| { + let node = attach_table_node(node, self)?; + node.transform_data(|n| attach_js_udf_exprs(n, &pool, &mut cache)) + }) + .map(|t| t.data) } /// Returns the physical catalog snapshot backing this query context. @@ -306,7 +333,7 @@ pub struct ExecContextBuilder { store: DataStore, datasets_cache: DatasetsCache, ethcall_udfs_cache: EthCallUdfsCache, - isolate_pool: IsolatePool, + isolate_pool: Option, global_memory_pool: Arc, query_max_mem_mb: usize, disk_manager: Arc, @@ -329,7 +356,7 @@ impl ExecContextBuilder { store: env.store, datasets_cache: env.datasets_cache, ethcall_udfs_cache: env.ethcall_udfs_cache, - isolate_pool: env.isolate_pool, + isolate_pool: None, global_memory_pool: env.global_memory_pool, query_max_mem_mb: env.query_max_mem_mb, disk_manager: env.disk_manager, @@ -373,6 +400,15 @@ impl ExecContextBuilder { self } + /// Sets the isolate pool for JavaScript UDF execution. + /// + /// Required before calling [`for_catalog`](Self::for_catalog). Panics at + /// context construction time if not set. + pub fn with_isolate_pool(mut self, pool: IsolatePool) -> Self { + self.isolate_pool = Some(pool); + self + } + /// Creates an [`ExecContext`] backed by a physical catalog. /// /// This is the async construction step that cannot be performed in the @@ -414,13 +450,16 @@ impl ExecContextBuilder { }) .collect::, _>>()?; + let isolate_pool = self + .isolate_pool + .expect("IsolatePool is required — call .with_isolate_pool() before .for_catalog()"); + let env = ExecEnv { session_config: self.session_config.clone(), global_memory_pool: self.global_memory_pool.clone(), disk_manager: self.disk_manager.clone(), cache_manager: self.cache_manager.clone(), object_store_registry: self.object_store_registry.clone(), - isolate_pool: self.isolate_pool, query_max_mem_mb: self.query_max_mem_mb, store: self.store, datasets_cache: self.datasets_cache, @@ -448,6 +487,7 @@ impl ExecContextBuilder { Ok(ExecContext { env, + isolate_pool, physical_table, query_snapshots, tiered_memory_pool, @@ -687,8 +727,19 @@ fn register_catalog( .map_err(RegisterTableError::RegisterTable)?; } - // Register catalog UDFs + // Register catalog UDFs, skipping planning-only JS UDFs. + // + // Catalog UDFs may include `PlanJsUdf`-backed entries that are + // non-executable (they panic on invoke). The execution-boundary JS UDF + // attach (`ExecContext::attach`) rewrites inline plan references to + // `ExecJsUdf`, so session-level registration of planning JS UDFs is + // unnecessary and would re-introduce panic-guarded objects into the + // execution session. for udf in catalog.udfs() { + let is_plan_js_udf = udf.inner().as_any().downcast_ref::().is_some(); + if is_plan_js_udf { + continue; + } state .register_udf(Arc::new(udf.clone())) .map_err(RegisterTableError::RegisterUdf)?; @@ -746,6 +797,50 @@ fn attach_table_node( } } +/// Rewrites all [`PlanJsUdf`]-backed UDF references in a single plan node's +/// expressions to execution-ready UDF references. +/// +/// Each unique `PlanJsUdf` (keyed by `name()`) is attached exactly once; +/// subsequent occurrences reuse the same `Arc`. +fn attach_js_udf_exprs( + node: LogicalPlan, + pool: &IsolatePool, + cache: &mut HashMap>, +) -> Result, DataFusionError> { + node.map_expressions(|expr| expr.transform(|e| rewrite_single_expr(e, pool, cache))) +} + +/// Rewrites a single `Expr` node if it is a `PlanJsUdf`-backed scalar function. +fn rewrite_single_expr( + expr: Expr, + pool: &IsolatePool, + cache: &mut HashMap>, +) -> Result, DataFusionError> { + let Expr::ScalarFunction(ref sf) = expr else { + return Ok(Transformed::no(expr)); + }; + + let Some(plan_udf) = sf.func.inner().as_any().downcast_ref::() else { + return Ok(Transformed::no(expr)); + }; + + let name = sf.func.name().to_string(); + + let exec_udf = if let Some(cached) = cache.get(&name) { + cached.clone() + } else { + let exec = plan_udf.attach(pool.clone()); + let udf = Arc::new(exec.into_scalar_udf()); + cache.insert(name, udf.clone()); + udf + }; + + Ok(Transformed::yes(Expr::ScalarFunction(ScalarFunction { + func: exec_udf, + args: sf.args.clone(), + }))) +} + /// `logical_optimize` controls whether logical optimizations should be applied to `plan`. #[tracing::instrument(skip_all, err)] async fn execute_plan( @@ -921,6 +1016,13 @@ mod tests { array::{Array, Int64Array, StringArray}, datatypes::{DataType, Field, Schema}, }; + use datafusion::{ + common::tree_node::{TreeNode, TreeNodeRecursion}, + logical_expr::{LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, expr::ScalarFunction}, + prelude::Expr, + }; + use datasets_derived::function::Function; + use js_runtime::isolate_pool::IsolatePool; use super::*; @@ -1178,4 +1280,249 @@ mod tests { assert_eq!(result, "", "empty string should remain empty"); } } + + #[test] + fn attach_js_udfs_single_plan_udf_rewrites_to_exec() { + //* Given + let (_, expr) = plan_udf_expr("my_func"); + let plan = LogicalPlanBuilder::empty(false) + .project(vec![expr]) + .expect("project should succeed") + .build() + .expect("build should succeed"); + + //* When + let result = attach_js_udfs(plan, IsolatePool::new()).expect("attach should succeed"); + + //* Then + assert_no_plan_js_udfs(&result); + } + + #[test] + fn attach_js_udfs_duplicate_udf_refs_deduplicates() { + //* Given + let (udf, _) = plan_udf_expr("my_func"); + let plan = LogicalPlanBuilder::empty(false) + .project(vec![ + Expr::ScalarFunction(ScalarFunction::new_udf( + udf.clone(), + vec![datafusion::prelude::lit("a")], + )), + Expr::ScalarFunction(ScalarFunction::new_udf( + udf, + vec![datafusion::prelude::lit("b")], + )), + ]) + .expect("project should succeed") + .build() + .expect("build should succeed"); + + //* When + let result = attach_js_udfs(plan, IsolatePool::new()).expect("attach should succeed"); + + //* Then + assert_no_plan_js_udfs(&result); + } + + #[test] + fn attach_js_udfs_no_js_udfs_passes_through() { + //* Given + let plan = LogicalPlanBuilder::empty(false) + .project(vec![datafusion::prelude::lit(1)]) + .expect("project should succeed") + .build() + .expect("build should succeed"); + + //* When + let result = attach_js_udfs(plan, IsolatePool::new()); + + //* Then + assert!( + result.is_ok(), + "attach should succeed for plan without JS UDFs" + ); + } + + #[test] + fn attach_js_udfs_schema_qualified_udf_preserves_name() { + //* Given + let function = sample_function(); + let plan_udf = PlanJsUdf::from_function("my_func", &function, Some("ns/dataset@1.0")); + let expected_name = plan_udf.name().to_string(); + let udf = Arc::new(ScalarUDF::new_from_impl(plan_udf)); + let expr = Expr::ScalarFunction(ScalarFunction::new_udf( + udf, + vec![datafusion::prelude::lit("test")], + )); + let plan = LogicalPlanBuilder::empty(false) + .project(vec![expr]) + .expect("project should succeed") + .build() + .expect("build should succeed"); + + //* When + let result = attach_js_udfs(plan, IsolatePool::new()).expect("attach should succeed"); + + //* Then + assert_no_plan_js_udfs(&result); + // Verify the rewritten UDF preserves the schema-qualified name. + result + .apply(|node| { + for expr in node.expressions() { + expr.apply(|e| { + if let Expr::ScalarFunction(sf) = e { + assert_eq!( + sf.func.name(), + expected_name, + "attached UDF should preserve schema-qualified name" + ); + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("expression traversal should succeed"); + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("plan traversal should succeed"); + } + + #[test] + fn attach_js_udfs_multiple_distinct_udfs_rewrites_all() { + //* Given + let (_, expr_a) = plan_udf_expr("func_a"); + let (_, expr_b) = plan_udf_expr("func_b"); + let plan = LogicalPlanBuilder::empty(false) + .project(vec![expr_a, expr_b]) + .expect("project should succeed") + .build() + .expect("build should succeed"); + + //* When + let result = attach_js_udfs(plan, IsolatePool::new()).expect("attach should succeed"); + + //* Then + assert_no_plan_js_udfs(&result); + } + + #[test] + fn attach_js_udfs_nested_case_expression_rewrites_inner_udf() { + //* Given + let (_, udf_expr) = plan_udf_expr("my_func"); + // Wrap the JS UDF call inside a CASE expression. + let nested = Expr::Case(datafusion::logical_expr::expr::Case { + expr: None, + when_then_expr: vec![(Box::new(udf_expr), Box::new(datafusion::prelude::lit(1)))], + else_expr: Some(Box::new(datafusion::prelude::lit(0))), + }); + let plan = LogicalPlanBuilder::empty(false) + .project(vec![nested]) + .expect("project should succeed") + .build() + .expect("build should succeed"); + + //* When + let result = attach_js_udfs(plan, IsolatePool::new()).expect("attach should succeed"); + + //* Then + assert_no_plan_js_udfs(&result); + } + + #[test] + fn attach_js_udfs_mixed_js_and_non_js_udfs_rewrites_only_js() { + //* Given + let (_, js_expr) = plan_udf_expr("js_func"); + // A non-JS built-in UDF expression (abs). + let non_js_expr = datafusion::prelude::abs(datafusion::prelude::lit(42)); + let plan = LogicalPlanBuilder::empty(false) + .project(vec![js_expr, non_js_expr]) + .expect("project should succeed") + .build() + .expect("build should succeed"); + + //* When + let result = attach_js_udfs(plan, IsolatePool::new()).expect("attach should succeed"); + + //* Then + assert_no_plan_js_udfs(&result); + // Verify the non-JS UDF (abs) is still present and unchanged. + let mut found_abs = false; + result + .apply(|node| { + for expr in node.expressions() { + expr.apply(|e| { + if let Expr::ScalarFunction(sf) = e + && sf.func.name() == "abs" + { + found_abs = true; + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("expression traversal should succeed"); + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("plan traversal should succeed"); + assert!(found_abs, "non-JS UDF (abs) should be preserved in plan"); + } + + /// Test helper: wraps the node-level `attach_js_udf_exprs` in a full plan + /// traversal, mirroring the plan-level attach in `ExecContext::attach`. + fn attach_js_udfs( + plan: LogicalPlan, + pool: IsolatePool, + ) -> Result { + let mut cache: HashMap> = HashMap::new(); + plan.transform_with_subqueries(|node| attach_js_udf_exprs(node, &pool, &mut cache)) + .map(|t| t.data) + } + + /// Builds a minimal [`Function`] with a single `Utf8 -> Boolean` signature. + fn sample_function() -> Function { + serde_json::from_value(serde_json::json!({ + "inputTypes": ["Utf8"], + "outputType": "Boolean", + "source": { + "source": "function f(a) { return true; }", + "filename": "test.js" + } + })) + .expect("test function should deserialize") + } + + /// Creates a [`PlanJsUdf`]-backed `ScalarUDF` and a matching `Expr` for + /// use in test plans. + fn plan_udf_expr(name: &str) -> (Arc, Expr) { + let function = sample_function(); + let plan_udf = PlanJsUdf::from_function(name, &function, None); + let udf = Arc::new(ScalarUDF::new_from_impl(plan_udf)); + let expr = Expr::ScalarFunction(ScalarFunction::new_udf( + udf.clone(), + vec![datafusion::prelude::lit("test")], + )); + (udf, expr) + } + + /// Asserts no `PlanJsUdf` references remain anywhere in the plan. + fn assert_no_plan_js_udfs(plan: &LogicalPlan) { + plan.apply(|node| { + node.expressions().iter().for_each(|expr| { + expr.apply(|e| { + if let Expr::ScalarFunction(sf) = e { + assert!( + sf.func + .inner() + .as_any() + .downcast_ref::() + .is_none(), + "found PlanJsUdf in rewritten plan" + ); + } + Ok(TreeNodeRecursion::Continue) + }) + .expect("expression traversal should succeed"); + }); + Ok(TreeNodeRecursion::Continue) + }) + .expect("plan traversal should succeed"); + } } diff --git a/crates/core/common/src/dataset_schema_provider.rs b/crates/core/common/src/dataset_schema_provider.rs index ec1b1a646..32ef4f479 100644 --- a/crates/core/common/src/dataset_schema_provider.rs +++ b/crates/core/common/src/dataset_schema_provider.rs @@ -13,11 +13,10 @@ use datafusion::{ TableProvider, }, error::DataFusionError, - logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}, + logical_expr::ScalarUDF, }; use datasets_common::{dataset::Dataset, table_name::TableName}; use datasets_derived::{dataset::Dataset as DerivedDataset, func_name::ETH_CALL_FUNCTION_NAME}; -use js_runtime::{isolate_pool::IsolatePool, js_udf::JsUdf}; use parking_lot::RwLock; use crate::{ @@ -29,34 +28,33 @@ use crate::{ }, }, plan_table::PlanTable, + udfs::PlanJsUdf, }; /// Schema provider for a dataset. /// /// Resolves tables as [`PlanTable`] instances (schema-only, no data access) -/// and functions using the provided isolate pool. +/// and functions as planning-phase [`PlanJsUdf`] representations that carry +/// no runtime resources. pub struct DatasetSchemaProvider { schema_name: String, dataset: Arc, ethcall_udfs_cache: EthCallUdfsCache, - isolate_pool: IsolatePool, tables: RwLock>>, functions: RwLock>>, } impl DatasetSchemaProvider { - /// Creates a new provider for the given dataset, schema name, and isolate pool. + /// Creates a new provider for the given dataset and schema name. pub(crate) fn new( schema_name: String, dataset: Arc, ethcall_udfs_cache: EthCallUdfsCache, - isolate_pool: IsolatePool, ) -> Self { Self { schema_name, dataset, ethcall_udfs_cache, - isolate_pool, tables: RwLock::new(Default::default()), functions: RwLock::new(Default::default()), } @@ -165,25 +163,18 @@ impl FuncSchemaProvider for DatasetSchemaProvider { } } - // Try to get UDF from derived dataset - let udf = self.dataset.downcast_ref::().and_then(|d| { - d.function_by_name(name).map(|function| { - AsyncScalarUDF::new(Arc::new(JsUdf::new( - self.isolate_pool.clone(), - self.schema_name.clone(), - function.source.source.clone(), - function.source.filename.clone(), - Arc::from(name), - function - .input_types - .iter() - .map(|dt| dt.clone().into_arrow()) - .collect(), - function.output_type.clone().into_arrow(), - ))) - .into_scalar_udf() - }) - }); + // Try to get UDF from derived dataset and build a planning-only UDF. + let udf: Option = self + .dataset + .downcast_ref::() + .and_then(|d| d.function_by_name(name)) + .map(|function| { + ScalarUDF::new_from_impl(PlanJsUdf::from_function( + name, + function, + Some(&self.schema_name), + )) + }); if let Some(udf) = udf { let udf = Arc::new(udf); diff --git a/crates/core/common/src/exec_env.rs b/crates/core/common/src/exec_env.rs index c17c89209..f9c2ed367 100644 --- a/crates/core/common/src/exec_env.rs +++ b/crates/core/common/src/exec_env.rs @@ -12,7 +12,6 @@ use datafusion::{ runtime_env::RuntimeEnvBuilder, }, }; -use js_runtime::isolate_pool::IsolatePool; use crate::{ amp_catalog_provider::AMP_CATALOG_NAME, @@ -71,9 +70,6 @@ pub struct ExecEnv { pub cache_manager: Arc, pub object_store_registry: Arc, - // Existing fields - pub isolate_pool: IsolatePool, - // Per-query memory limit configuration pub query_max_mem_mb: usize, @@ -98,7 +94,6 @@ pub fn create( store: DataStore, datasets_cache: DatasetsCache, ethcall_udfs_cache: EthCallUdfsCache, - isolate_pool: IsolatePool, ) -> Result { let spill_allowed = !spill_location.is_empty(); let disk_manager_mode = if spill_allowed { @@ -130,7 +125,6 @@ pub fn create( disk_manager: runtime_env.disk_manager, cache_manager: runtime_env.cache_manager, object_store_registry: runtime_env.object_store_registry, - isolate_pool, query_max_mem_mb, store, datasets_cache, diff --git a/crates/core/common/src/self_schema_provider.rs b/crates/core/common/src/self_schema_provider.rs index 75ffe7a47..df0391390 100644 --- a/crates/core/common/src/self_schema_provider.rs +++ b/crates/core/common/src/self_schema_provider.rs @@ -12,11 +12,10 @@ use datafusion::{ TableProvider, }, error::DataFusionError, - logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}, + logical_expr::ScalarUDF, }; use datasets_common::table_name::TableName; use datasets_derived::{deps::SELF_REF_KEYWORD, func_name::FuncName, function::Function}; -use js_runtime::{isolate_pool::IsolatePool, js_udf::JsUdf}; use parking_lot::RwLock; use crate::{ @@ -28,6 +27,7 @@ use crate::{ }, }, plan_table::PlanTable, + udfs::PlanJsUdf, }; /// Schema provider for virtual schemas (e.g., `"self"`) that resolve tables @@ -59,28 +59,17 @@ impl SelfSchemaProvider { /// Creates a provider from manifest functions (no tables). /// - /// Functions are already validated at deserialization time. - pub fn from_manifest_udfs( - isolate_pool: IsolatePool, - functions: &BTreeMap, - ) -> Self { + /// Functions are already validated at deserialization time, so this is + /// infallible — no runtime type checks needed. + pub fn from_manifest_udfs(functions: &BTreeMap) -> Self { let scalar_udfs: Vec = functions .iter() .map(|(name, function)| { - AsyncScalarUDF::new(Arc::new(JsUdf::new( - isolate_pool.clone(), - Some(SELF_REF_KEYWORD.to_string()), - function.source.source.clone(), - function.source.filename.clone(), - Arc::from(name.as_str()), - function - .input_types - .iter() - .map(|dt| dt.clone().into_arrow()) - .collect(), - function.output_type.clone().into_arrow(), - ))) - .into_scalar_udf() + ScalarUDF::new_from_impl(PlanJsUdf::from_function( + name.as_str(), + function, + Some(SELF_REF_KEYWORD), + )) }) .collect(); diff --git a/crates/core/common/src/streaming_query.rs b/crates/core/common/src/streaming_query.rs index 5a9dff815..925e2f956 100644 --- a/crates/core/common/src/streaming_query.rs +++ b/crates/core/common/src/streaming_query.rs @@ -19,6 +19,7 @@ use datasets_common::{ use datasets_derived::{dataset::Dataset as DerivedDataset, deps::SELF_REF_KEYWORD}; use datasets_raw::dataset::Dataset as RawDataset; use futures::stream::{self, BoxStream, StreamExt}; +use js_runtime::isolate_pool::IsolatePool; use message_stream_with_block_complete::MessageStreamWithBlockComplete; use metadata_db::{NotificationMultiplexerHandle, physical_table_revision::LocationId}; use tokio::{ @@ -311,6 +312,7 @@ impl StreamingQueryHandle { /// stream. pub struct StreamingQuery { exec_env: ExecEnv, + isolate_pool: IsolatePool, catalog: Catalog, plan: DetachedLogicalPlan, start_block: BlockNum, @@ -338,6 +340,7 @@ impl StreamingQuery { #[expect(clippy::too_many_arguments)] pub async fn spawn( exec_env: ExecEnv, + isolate_pool: IsolatePool, catalog: Catalog, plan: DetachedLogicalPlan, start_block: BlockNum, @@ -386,7 +389,6 @@ impl StreamingQuery { AmpCatalogProvider::new( exec_env.datasets_cache.clone(), exec_env.ethcall_udfs_cache.clone(), - exec_env.isolate_pool.clone(), ) .with_dep_aliases(dep_alias_map) .with_self_schema(self_schema), @@ -426,6 +428,7 @@ impl StreamingQuery { .map_err(SpawnError::ConvertCursor)?; let streaming_query = Self { exec_env, + isolate_pool, catalog, plan, tx, @@ -462,6 +465,7 @@ impl StreamingQuery { // The table snapshots to execute the microbatch against. let ctx = ExecContextBuilder::new(self.exec_env.clone()) + .with_isolate_pool(self.isolate_pool.clone()) .for_catalog(self.catalog.clone(), false) .await .map_err(StreamingQueryExecutionError::CreateExecContext)?; @@ -579,6 +583,7 @@ impl StreamingQuery { ) }; ExecContextBuilder::new(self.exec_env.clone()) + .with_isolate_pool(self.isolate_pool.clone()) .for_catalog(catalog, false) .await .map_err(NextMicrobatchRangeError::CreateExecContext)? @@ -750,6 +755,7 @@ impl StreamingQuery { Default::default(), ); ExecContextBuilder::new(ctx.env.clone()) + .with_isolate_pool(ctx.isolate_pool().clone()) .for_catalog(catalog, true) .await .map_err(ReorgBaseError::CreateExecContext)? diff --git a/crates/core/common/src/udfs.rs b/crates/core/common/src/udfs.rs new file mode 100644 index 000000000..437b73691 --- /dev/null +++ b/crates/core/common/src/udfs.rs @@ -0,0 +1,17 @@ +//! JS UDF lifecycle types for planning and execution phases. +//! +//! This module implements the plan-vs-exec split for JavaScript user-defined +//! functions, mirroring the existing `PlanTable` / `QueryableSnapshot` pattern +//! used for table lifecycle management. +//! +//! - [`plan::PlanJsUdf`]: planning-phase UDF — holds signature and source but +//! no runtime resources. Panics if DataFusion attempts to invoke it. +//! - [`exec::ExecJsUdf`]: execution-phase UDF — produced by attaching an +//! `IsolatePool` to a `PlanJsUdf`. + +pub mod block_num; +pub mod exec; +pub mod plan; + +pub use exec::ExecJsUdf; +pub use plan::PlanJsUdf; diff --git a/crates/core/common/src/udfs/exec.rs b/crates/core/common/src/udfs/exec.rs new file mode 100644 index 000000000..07b9afd3d --- /dev/null +++ b/crates/core/common/src/udfs/exec.rs @@ -0,0 +1,146 @@ +//! Execution-phase JS UDF representation. +//! +//! [`ExecJsUdf`] is produced by attaching runtime resources (`IsolatePool`) +//! to a [`PlanJsUdf`](super::plan::PlanJsUdf). It wraps an executable +//! `ScalarUDF` that DataFusion can invoke during query execution. + +use std::sync::Arc; + +use datafusion::logical_expr::{ScalarUDF, ScalarUDFImpl, async_udf::AsyncScalarUDF}; +use js_runtime::{isolate_pool::IsolatePool, js_udf::JsUdf}; + +use super::plan::PlanJsUdf; + +/// Execution-ready JavaScript user-defined function. +/// +/// Created exclusively via [`PlanJsUdf::attach`]. Holds runtime resources +/// (`IsolatePool`) and produces an executable [`ScalarUDF`] for DataFusion. +#[derive(Debug)] +pub struct ExecJsUdf { + scalar_udf: ScalarUDF, +} + +impl ExecJsUdf { + /// Build an `ExecJsUdf` by attaching runtime resources to a planning UDF. + /// + /// This is the only constructor — enforces the plan→exec lifecycle. + pub(super) fn from_plan(plan: &PlanJsUdf, pool: IsolatePool) -> Self { + let sig = plan.signature(); + let input_types = match &sig.type_signature { + datafusion::logical_expr::TypeSignature::Exact(types) => types.clone(), + _ => unreachable!("PlanJsUdf always uses TypeSignature::Exact"), + }; + + let js_udf = JsUdf::new( + pool, + plan.schema_name().map(String::from), + plan.source_code().clone(), + Arc::clone(plan.filename()), + Arc::clone(plan.function_name()), + input_types, + plan.output_type().clone(), + ); + + let scalar_udf = AsyncScalarUDF::new(Arc::new(js_udf)).into_scalar_udf(); + + Self { scalar_udf } + } + + /// The executable `ScalarUDF` for registration in DataFusion catalogs. + pub fn scalar_udf(&self) -> &ScalarUDF { + &self.scalar_udf + } + + /// Consume self and return the inner `ScalarUDF`. + pub fn into_scalar_udf(self) -> ScalarUDF { + self.scalar_udf + } +} + +#[cfg(test)] +mod tests { + use datafusion::{arrow::datatypes::DataType, logical_expr::TypeSignature}; + use datasets_derived::function::Function; + use js_runtime::isolate_pool::IsolatePool; + + use super::*; + + #[test] + fn attach_with_valid_function_produces_executable_udf() { + //* Given + let function = sample_function(); + let plan = PlanJsUdf::from_function("my_func", &function, None); + + //* When + let exec = plan.attach(IsolatePool::new()); + + //* Then + let udf = exec.scalar_udf(); + assert_eq!(udf.name(), "my_func"); + let inner_sig = udf.inner().signature(); + assert!( + matches!(&inner_sig.type_signature, TypeSignature::Exact(types) if types == &[DataType::Utf8, DataType::Int64]), + ); + } + + #[test] + fn attach_with_schema_qualified_function_preserves_name() { + //* Given + let function = sample_function(); + let plan = PlanJsUdf::from_function("my_func", &function, Some("ns/ds@1.0")); + + //* When + let exec = plan.attach(IsolatePool::new()); + + //* Then + assert_eq!(exec.scalar_udf().name(), plan.name()); + assert_eq!(exec.scalar_udf().name(), "\"ns/ds@1.0\".my_func"); + } + + #[test] + fn attach_with_distinct_schemas_produces_distinct_names() { + //* Given + let function = sample_function(); + let plan_a = PlanJsUdf::from_function("func", &function, Some("schema_a")); + let plan_b = PlanJsUdf::from_function("func", &function, Some("schema_b")); + + //* When + let exec_a = plan_a.attach(IsolatePool::new()); + let exec_b = plan_b.attach(IsolatePool::new()); + + //* Then + assert_ne!( + exec_a.scalar_udf().name(), + exec_b.scalar_udf().name(), + "different schemas must produce different runtime UDF names" + ); + assert_eq!(exec_a.scalar_udf().name(), plan_a.name()); + assert_eq!(exec_b.scalar_udf().name(), plan_b.name()); + } + + #[test] + fn into_scalar_udf_with_attached_exec_returns_udf() { + //* Given + let function = sample_function(); + let exec = PlanJsUdf::from_function("my_func", &function, None).attach(IsolatePool::new()); + + //* When + let udf = exec.into_scalar_udf(); + + //* Then + assert_eq!(udf.name(), "my_func"); + } + + /// Builds a minimal [`Function`] with a `(Utf8, Int64) -> Boolean` signature. + fn sample_function() -> Function { + serde_json::from_value(serde_json::json!({ + "inputTypes": ["Utf8", "Int64"], + "outputType": "Boolean", + "source": { + "source": "function my_func(a, b) { return true; }", + "filename": "test.js" + } + })) + .expect("test function should deserialize") + } +} diff --git a/crates/core/common/src/udfs/mod.rs b/crates/core/common/src/udfs/mod.rs deleted file mode 100644 index 679d11803..000000000 --- a/crates/core/common/src/udfs/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod block_num; diff --git a/crates/core/common/src/udfs/plan.rs b/crates/core/common/src/udfs/plan.rs new file mode 100644 index 000000000..a68ea0d6b --- /dev/null +++ b/crates/core/common/src/udfs/plan.rs @@ -0,0 +1,284 @@ +//! Planning-phase JS UDF representation. +//! +//! [`PlanJsUdf`] implements [`ScalarUDFImpl`] so it can participate in +//! DataFusion logical planning and type-checking, but it carries **no** +//! runtime resources (`IsolatePool`). Any attempt by DataFusion to invoke +//! the UDF before it has been attached to an execution context will panic. + +use std::{any::Any, sync::Arc}; + +use datafusion::{ + arrow::datatypes::DataType, + common::utils::quote_identifier, + error::DataFusionError, + logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, + }, +}; +use datasets_derived::function::Function; +use js_runtime::isolate_pool::IsolatePool; + +use super::exec::ExecJsUdf; + +/// Planning-phase representation of a JavaScript user-defined function. +/// +/// This type holds the UDF signature and source code needed for logical +/// planning and type-checking. It implements [`ScalarUDFImpl`] so DataFusion +/// can resolve the function during planning, but its `invoke_with_args` +/// implementation panics because execution requires runtime resources +/// attached via [`ExecJsUdf`]. +/// +/// Construct via [`PlanJsUdf::from_function`]. +#[derive(Debug)] +pub struct PlanJsUdf { + /// Schema-qualified UDF name (e.g. `"ns/dataset@0.0.0".func`). + udf_name: String, + /// DataFusion signature built from the function's Arrow input types. + signature: Signature, + /// Arrow return type. + return_type: DataType, + /// JS source code, retained for the attach phase. + source_code: Arc, + /// Filename where the function is defined, retained for the attach phase. + filename: Arc, + /// Bare function name (unqualified), retained for the attach phase. + function_name: Arc, + /// Schema name (if any), retained for the attach phase so the runtime + /// UDF preserves the same qualified identity as the planning UDF. + schema_name: Option, +} + +impl PlanJsUdf { + /// Create a planning JS UDF from a [`Function`] definition. + /// + /// The `schema_name` is used to build a schema-qualified UDF name that + /// matches how DataFusion resolves qualified function references + /// (e.g. `"namespace/dataset@0.0.0".my_func`). + pub fn from_function(name: &str, function: &Function, schema_name: Option<&str>) -> Self { + let schema_owned = match schema_name { + Some(s) if !s.is_empty() => Some(s.to_string()), + _ => None, + }; + + let udf_name = match &schema_owned { + Some(schema) => { + format!("{}.{}", quote_identifier(schema), name) + } + None => name.to_string(), + }; + + let input_types: Vec = function + .input_types + .iter() + .map(|dt| dt.clone().into_arrow()) + .collect(); + let signature = Signature::new(TypeSignature::Exact(input_types), Volatility::Immutable); + + Self { + udf_name, + signature, + return_type: function.output_type.clone().into_arrow(), + source_code: function.source.source.clone(), + filename: function.source.filename.clone(), + function_name: Arc::from(name), + schema_name: schema_owned, + } + } + + /// The JS source code, needed at attach time to construct the runtime UDF. + pub fn source_code(&self) -> &Arc { + &self.source_code + } + + /// The filename where the function is defined, needed at attach time. + pub fn filename(&self) -> &Arc { + &self.filename + } + + /// The bare (unqualified) function name. + pub fn function_name(&self) -> &Arc { + &self.function_name + } + + /// The Arrow output type, needed at attach time to construct the runtime UDF. + pub fn output_type(&self) -> &DataType { + &self.return_type + } + + /// The schema name (if any), needed at attach time to preserve qualified + /// UDF identity in the runtime. + pub fn schema_name(&self) -> Option<&str> { + self.schema_name.as_deref() + } + + /// Attach runtime resources to produce an executable JS UDF. + /// + /// This is the **only** way to obtain an [`ExecJsUdf`]. The returned + /// value wraps an executable `ScalarUDF` backed by the given + /// `IsolatePool`. + pub fn attach(&self, pool: IsolatePool) -> ExecJsUdf { + ExecJsUdf::from_plan(self, pool) + } +} + +impl PartialEq for PlanJsUdf { + fn eq(&self, other: &Self) -> bool { + self.udf_name == other.udf_name + && self.return_type == other.return_type + && self.signature == other.signature + && self.function_name == other.function_name + && self.schema_name == other.schema_name + } +} + +impl Eq for PlanJsUdf {} + +impl std::hash::Hash for PlanJsUdf { + fn hash(&self, state: &mut H) { + self.udf_name.hash(state); + self.return_type.hash(state); + self.signature.hash(state); + self.function_name.hash(state); + self.schema_name.hash(state); + } +} + +impl ScalarUDFImpl for PlanJsUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.udf_name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(self.return_type.clone()) + } + + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> Result { + unreachable!( + "PlanJsUdf '{}' must be attached to an execution context before invocation", + self.udf_name, + ) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::{ + arrow::datatypes::DataType, + logical_expr::{ScalarUDF, ScalarUDFImpl, TypeSignature}, + }; + use datasets_derived::function::Function; + + use super::*; + + #[test] + fn from_function_without_schema_uses_bare_name() { + //* Given + let function = sample_function(); + + //* When + let plan = PlanJsUdf::from_function("my_func", &function, None); + + //* Then + assert_eq!(plan.name(), "my_func"); + assert_eq!( + plan.return_type(&[]).expect("return_type should succeed"), + DataType::Boolean + ); + assert_eq!(plan.function_name().as_ref(), "my_func"); + let sig = plan.signature(); + assert!( + matches!(&sig.type_signature, TypeSignature::Exact(types) if types == &[DataType::Utf8, DataType::Int64]), + "unexpected signature: {:?}", + sig.type_signature + ); + } + + #[test] + fn from_function_with_schema_produces_qualified_name() { + //* Given + let function = sample_function(); + + //* When + let plan = PlanJsUdf::from_function("my_func", &function, Some("ns/dataset@0.0.0")); + + //* Then + assert_eq!(plan.name(), "\"ns/dataset@0.0.0\".my_func"); + } + + #[test] + fn from_function_with_empty_schema_uses_bare_name() { + //* Given + let function = sample_function(); + + //* When + let plan = PlanJsUdf::from_function("my_func", &function, Some("")); + + //* Then + assert_eq!(plan.name(), "my_func"); + } + + #[test] + #[should_panic(expected = "must be attached")] + fn invoke_with_args_before_attach_panics() { + //* Given + use datafusion::{arrow::datatypes::Field, config::ConfigOptions}; + + let function = sample_function(); + let plan = PlanJsUdf::from_function("my_func", &function, None); + let udf = ScalarUDF::new_from_impl(plan); + let args = ScalarFunctionArgs { + args: vec![], + arg_fields: vec![], + number_rows: 0, + return_field: Arc::new(Field::new("result", DataType::Boolean, true)), + config_options: Arc::new(ConfigOptions::default()), + }; + + //* When + let _ = udf.inner().invoke_with_args(args); + + //* Then — should_panic verifies the panic message + } + + #[test] + fn source_with_valid_function_retains_original_source() { + //* Given + let function = sample_function(); + + //* When + let plan = PlanJsUdf::from_function("my_func", &function, None); + + //* Then + assert_eq!(plan.filename().as_ref(), "test.js"); + assert_eq!( + plan.source_code().as_ref(), + "function f(a, b) { return true; }" + ); + } + + /// Builds a minimal [`Function`] with a `(Utf8, Int64) -> Boolean` signature. + fn sample_function() -> Function { + serde_json::from_value(serde_json::json!({ + "inputTypes": ["Utf8", "Int64"], + "outputType": "Boolean", + "source": { + "source": "function f(a, b) { return true; }", + "filename": "test.js" + } + })) + .expect("test function should deserialize") + } +} diff --git a/crates/core/common/tests/it_session_async_resolution.rs b/crates/core/common/tests/it_session_async_resolution.rs index 9f50b09db..104b52508 100644 --- a/crates/core/common/tests/it_session_async_resolution.rs +++ b/crates/core/common/tests/it_session_async_resolution.rs @@ -405,7 +405,6 @@ async fn exec_statement_to_plan_with_qualified_function_uses_async_pre_resolutio disk_manager: runtime_env.disk_manager.clone(), cache_manager: runtime_env.cache_manager.clone(), object_store_registry: runtime_env.object_store_registry.clone(), - isolate_pool: IsolatePool::new(), query_max_mem_mb: 64, store: data_store, datasets_cache, @@ -420,6 +419,7 @@ async fn exec_statement_to_plan_with_qualified_function_uses_async_pre_resolutio let catalog = Catalog::default(); let query_ctx = ExecContextBuilder::new(exec_env) + .with_isolate_pool(IsolatePool::new()) .with_table_catalog( "amp", amp_table_catalog as Arc, @@ -506,7 +506,6 @@ async fn exec_statement_to_plan_with_overlapping_async_and_physical_tables_succe disk_manager: runtime_env.disk_manager.clone(), cache_manager: runtime_env.cache_manager.clone(), object_store_registry: runtime_env.object_store_registry.clone(), - isolate_pool: IsolatePool::new(), query_max_mem_mb: 64, store: data_store.clone(), datasets_cache, @@ -562,6 +561,7 @@ async fn exec_statement_to_plan_with_overlapping_async_and_physical_tables_succe // Use ignore_canonical_segments=true so that the empty revision // produces an empty (but valid) TableSnapshot. let query_ctx = ExecContextBuilder::new(exec_env) + .with_isolate_pool(IsolatePool::new()) .with_table_catalog( "amp", amp_table_catalog as Arc, diff --git a/crates/core/worker-datasets-derived/src/job_impl.rs b/crates/core/worker-datasets-derived/src/job_impl.rs index 82375a896..a86764a3e 100644 --- a/crates/core/worker-datasets-derived/src/job_impl.rs +++ b/crates/core/worker-datasets-derived/src/job_impl.rs @@ -219,7 +219,6 @@ pub async fn execute( ctx.data_store.clone(), ctx.datasets_cache.clone(), ctx.ethcall_udfs_cache.clone(), - ctx.isolate_pool.clone(), ) .map_err(Error::CreateQueryEnv)?; for (table, compactor) in &tables { diff --git a/crates/core/worker-datasets-derived/src/job_impl/query.rs b/crates/core/worker-datasets-derived/src/job_impl/query.rs index cb8a3a005..595c25abd 100644 --- a/crates/core/worker-datasets-derived/src/job_impl/query.rs +++ b/crates/core/worker-datasets-derived/src/job_impl/query.rs @@ -29,6 +29,7 @@ use common::{ }; use datafusion::parquet::errors::ParquetError; use futures::StreamExt as _; +use js_runtime::isolate_pool::IsolatePool; use tracing::instrument; use crate::job_ctx::Context; @@ -38,6 +39,7 @@ use crate::job_ctx::Context; pub async fn materialize_sql_query( ctx: &Context, env: &ExecEnv, + isolate_pool: &IsolatePool, catalog: &Catalog, query: DetachedLogicalPlan, start: BlockNum, @@ -57,6 +59,7 @@ pub async fn materialize_sql_query( let mut stream = { StreamingQuery::spawn( env.clone(), + isolate_pool.clone(), catalog.clone(), query, start, diff --git a/crates/core/worker-datasets-derived/src/job_impl/table.rs b/crates/core/worker-datasets-derived/src/job_impl/table.rs index 27ba67640..9ec3c8bc5 100644 --- a/crates/core/worker-datasets-derived/src/job_impl/table.rs +++ b/crates/core/worker-datasets-derived/src/job_impl/table.rs @@ -91,8 +91,7 @@ pub async fn materialize_table( let mut join_set = tasks::FailFastJoinSet::>::new(); - let self_schema_provider = - SelfSchemaProvider::from_manifest_udfs(env.isolate_pool.clone(), &manifest.functions); + let self_schema_provider = SelfSchemaProvider::from_manifest_udfs(&manifest.functions); let catalog = { let table_refs = resolve_table_references::(&query) @@ -114,13 +113,9 @@ pub async fn materialize_table( let self_schema: Arc = Arc::new(self_schema_provider); let amp_catalog = Arc::new( - AmpCatalogProvider::new( - ctx.datasets_cache.clone(), - ctx.ethcall_udfs_cache.clone(), - env.isolate_pool.clone(), - ) - .with_dep_aliases(dep_alias_map) - .with_self_schema(self_schema), + AmpCatalogProvider::new(ctx.datasets_cache.clone(), ctx.ethcall_udfs_cache.clone()) + .with_dep_aliases(dep_alias_map) + .with_self_schema(self_schema), ); let planning_ctx = PlanContextBuilder::new(env.session_config.clone()) .with_table_catalog(AMP_CATALOG_NAME, amp_catalog.clone()) @@ -155,6 +150,7 @@ pub async fn materialize_table( let resolved = resolve_end_block(&end, start, async { let query_ctx = ExecContextBuilder::new(env.clone()) + .with_isolate_pool(ctx.isolate_pool.clone()) .for_catalog(catalog.clone(), false) .await?; let max_end_blocks = query_ctx @@ -200,6 +196,7 @@ pub async fn materialize_table( let materialize_result = materialize_sql_query( &ctx, &env, + &ctx.isolate_pool, &catalog, plan.clone(), start, diff --git a/crates/services/admin-api/Cargo.toml b/crates/services/admin-api/Cargo.toml index 7c76b63d9..6529cdf6d 100644 --- a/crates/services/admin-api/Cargo.toml +++ b/crates/services/admin-api/Cargo.toml @@ -22,7 +22,6 @@ datasets-derived = { path = "../../core/datasets-derived" } evm-rpc-datasets = { path = "../../extractors/evm-rpc" } firehose-datasets = { path = "../../extractors/firehose" } futures.workspace = true -js-runtime = { version = "0.1.0", path = "../../core/js-runtime" } metadata-db = { path = "../../core/metadata-db" } monitoring = { path = "../../core/monitoring" } serde.workspace = true diff --git a/crates/services/admin-api/src/handlers/common.rs b/crates/services/admin-api/src/handlers/common.rs index 8e7f7b26f..02161192a 100644 --- a/crates/services/admin-api/src/handlers/common.rs +++ b/crates/services/admin-api/src/handlers/common.rs @@ -25,7 +25,6 @@ use datasets_derived::{ manifest::{TableInput, View}, }; use futures::{StreamExt as _, stream}; -use js_runtime::isolate_pool::IsolatePool; /// Map of table names to their SQL references (table refs and function refs) using dependency aliases or self-references. type TableReferencesMap = BTreeMap< @@ -356,17 +355,12 @@ pub async fn validate_derived_manifest( .iter() .map(|(alias, hash_ref)| (alias.to_string(), hash_ref.clone())) .collect(); - let self_schema: Arc = Arc::new( - SelfSchemaProvider::from_manifest_udfs(IsolatePool::dummy(), &manifest.functions), - ); + let self_schema: Arc = + Arc::new(SelfSchemaProvider::from_manifest_udfs(&manifest.functions)); let amp_catalog = Arc::new( - AmpCatalogProvider::new( - datasets_cache.clone(), - ethcall_udfs_cache.clone(), - IsolatePool::dummy(), - ) - .with_dep_aliases(dep_aliases) - .with_self_schema(self_schema), + AmpCatalogProvider::new(datasets_cache.clone(), ethcall_udfs_cache.clone()) + .with_dep_aliases(dep_aliases) + .with_self_schema(self_schema), ); let planning_ctx = PlanContextBuilder::new(session_config) .with_table_catalog(AMP_CATALOG_NAME, amp_catalog.clone()) diff --git a/crates/services/admin-api/src/handlers/schema.rs b/crates/services/admin-api/src/handlers/schema.rs index b6e706122..ed6fa9555 100644 --- a/crates/services/admin-api/src/handlers/schema.rs +++ b/crates/services/admin-api/src/handlers/schema.rs @@ -25,7 +25,6 @@ use datasets_derived::{ function::Function, manifest::TableSchema, }; -use js_runtime::isolate_pool::IsolatePool; use tracing::instrument; use crate::{ @@ -215,17 +214,12 @@ pub async fn handler( // Create planning context with self-schema provider let session_config = default_session_config().map_err(Error::SessionConfig)?; - let self_schema: Arc = Arc::new( - SelfSchemaProvider::from_manifest_udfs(IsolatePool::dummy(), &functions), - ); + let self_schema: Arc = + Arc::new(SelfSchemaProvider::from_manifest_udfs(&functions)); let amp_catalog = Arc::new( - AmpCatalogProvider::new( - ctx.datasets_cache.clone(), - ctx.ethcall_udfs_cache.clone(), - IsolatePool::dummy(), - ) - .with_dep_aliases(dep_aliases) - .with_self_schema(self_schema), + AmpCatalogProvider::new(ctx.datasets_cache.clone(), ctx.ethcall_udfs_cache.clone()) + .with_dep_aliases(dep_aliases) + .with_self_schema(self_schema), ); let planning_ctx = PlanContextBuilder::new(session_config) .with_table_catalog(AMP_CATALOG_NAME, amp_catalog.clone()) diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 272ea5497..4390e9fd6 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -81,6 +81,7 @@ type TonicStream = Pin> + Send + 'sta pub struct Service { config: Arc, env: ExecEnv, + isolate_pool: IsolatePool, notification_multiplexer: Arc, metrics: Option>, } @@ -102,7 +103,6 @@ impl Service { data_store, datasets_cache, ethcall_udfs_cache, - isolate_pool, ) .map_err(InitError::ExecEnv)?; let notification_multiplexer = @@ -112,6 +112,7 @@ impl Service { Ok(Self { config, env, + isolate_pool, notification_multiplexer, metrics, }) @@ -137,7 +138,6 @@ impl Service { let amp_catalog = Arc::new(AmpCatalogProvider::new( self.env.datasets_cache.clone(), self.env.ethcall_udfs_cache.clone(), - self.env.isolate_pool.clone(), )); let ctx = PlanContextBuilder::new(self.env.session_config.clone()) .with_table_catalog(AMP_CATALOG_NAME, amp_catalog.clone()) @@ -184,6 +184,7 @@ impl Service { // If not streaming or metadata db is not available, execute once if !is_streaming { let ctx = ExecContextBuilder::new(self.env.clone()) + .with_isolate_pool(self.isolate_pool.clone()) .for_catalog(catalog, false) .await .map_err(Error::CreateExecContext)?; @@ -257,6 +258,7 @@ impl Service { let query = StreamingQuery::spawn( self.env.clone(), + self.isolate_pool.clone(), catalog, plan, earliest_block, @@ -326,7 +328,6 @@ impl Service { let amp_catalog = Arc::new(AmpCatalogProvider::new( self.env.datasets_cache.clone(), self.env.ethcall_udfs_cache.clone(), - self.env.isolate_pool.clone(), )); PlanContextBuilder::new(self.env.session_config.clone()) .with_table_catalog(AMP_CATALOG_NAME, amp_catalog.clone()) diff --git a/tests/src/main.rs b/tests/src/main.rs index 0f56978bd..717fb7955 100644 --- a/tests/src/main.rs +++ b/tests/src/main.rs @@ -221,6 +221,7 @@ async fn main() { .expect("Failed to start controller for dependency restoration"); // Start a worker to handle dump jobs + let isolate_pool = IsolatePool::new(); let _worker = DaemonWorker::new( build_info, config.clone(), @@ -228,7 +229,7 @@ async fn main() { data_store.clone(), datasets_cache.clone(), ethcall_udfs_cache, - IsolatePool::new(), + isolate_pool, None, "bless".parse().expect("valid worker node id"), ) diff --git a/tests/src/testlib/ctx.rs b/tests/src/testlib/ctx.rs index 6cb7511a9..c5dc61886 100644 --- a/tests/src/testlib/ctx.rs +++ b/tests/src/testlib/ctx.rs @@ -456,6 +456,7 @@ impl TestCtxBuilder { let worker_meter = self.meter.clone(); let controller_meter = self.meter.clone(); + // Create isolate pool shared by server and worker fixtures let isolate_pool = IsolatePool::new(); // Start query server diff --git a/tests/src/testlib/fixtures/daemon_worker.rs b/tests/src/testlib/fixtures/daemon_worker.rs index 070cb617b..92e6a2634 100644 --- a/tests/src/testlib/fixtures/daemon_worker.rs +++ b/tests/src/testlib/fixtures/daemon_worker.rs @@ -88,6 +88,7 @@ impl DaemonWorker { event_emitter: Option>, ) -> Result { let worker_config = worker_config_from_common(&config); + let worker_fut = worker::service::new( worker_config.clone(), build_info,