Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 18 additions & 3 deletions crates/core/common/src/dataset_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use datafusion::{
TableProvider,
},
error::DataFusionError,
logical_expr::ScalarUDF,
logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF},
};
use datasets_common::{dataset::Dataset, table_name::TableName};
use datasets_derived::{dataset::Dataset as DerivedDataset, func_name::ETH_CALL_FUNCTION_NAME};
use js_runtime::isolate_pool::IsolatePool;
use js_runtime::{isolate_pool::IsolatePool, js_udf::JsUdf};
use parking_lot::RwLock;

use crate::{
Expand Down Expand Up @@ -177,7 +177,22 @@ impl FuncSchemaProvider for DatasetSchemaProvider {

// Try to get UDF from derived dataset
let udf = self.dataset.downcast_ref::<DerivedDataset>().and_then(|d| {
d.function_by_name(self.schema_name.clone(), name, self.isolate_pool.clone())
d.function_by_name(name).map(|function| {
AsyncScalarUDF::new(Arc::new(JsUdf::new(
self.isolate_pool.clone(),
self.schema_name.clone(),
function.source.source.clone(),
function.source.filename.clone(),
Arc::from(name),
function
.input_types
.iter()
.map(|dt| dt.clone().into_arrow())
.collect(),
function.output_type.clone().into_arrow(),
)))
.into_scalar_udf()
})
});

if let Some(udf) = udf {
Expand Down
25 changes: 12 additions & 13 deletions crates/core/common/src/self_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use datafusion::{
logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF},
};
use datasets_common::table_name::TableName;
use datasets_derived::{deps::SELF_REF_KEYWORD, func_name::FuncName, manifest::Function};
use datasets_derived::{deps::SELF_REF_KEYWORD, func_name::FuncName, function::Function};
use js_runtime::{isolate_pool::IsolatePool, js_udf::JsUdf};
use parking_lot::RwLock;

Expand Down Expand Up @@ -57,35 +57,34 @@ impl SelfSchemaProvider {
&self.udfs
}

/// Creates a provider from manifest function definitions (no tables).
/// Creates a provider from manifest functions (no tables).
///
/// Builds UDFs from all manifest functions.
/// Functions are already validated at deserialization time.
pub fn from_manifest_udfs(
schema_name: String,
isolate_pool: IsolatePool,
manifest_udfs: &BTreeMap<FuncName, Function>,
functions: &BTreeMap<FuncName, Function>,
) -> Self {
let udfs: Vec<ScalarUDF> = manifest_udfs
let scalar_udfs: Vec<ScalarUDF> = functions
.iter()
.map(|(func_name, func_def)| {
.map(|(name, function)| {
AsyncScalarUDF::new(Arc::new(JsUdf::new(
isolate_pool.clone(),
Some(SELF_REF_KEYWORD.to_string()),
func_def.source.source.clone(),
func_def.source.filename.clone().into(),
Arc::from(func_name.as_str()),
func_def
function.source.source.clone(),
function.source.filename.clone(),
Arc::from(name.as_str()),
function
.input_types
.iter()
.map(|dt| dt.clone().into_arrow())
.collect(),
func_def.output_type.clone().into_arrow(),
function.output_type.clone().into_arrow(),
)))
.into_scalar_udf()
})
.collect();

Self::new(schema_name, vec![], udfs)
Self::new(SELF_REF_KEYWORD.to_string(), vec![], scalar_udfs)
}
}

Expand Down
26 changes: 0 additions & 26 deletions crates/core/datasets-common/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,29 +173,3 @@ pub struct Field {
/// Whether the field can contain null values
pub nullable: bool,
}

/// User-defined function specification.
///
/// Defines a custom function with input/output types and implementation source.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[serde(rename_all = "camelCase")]
pub struct Function {
// TODO: Support SQL type names, see https://datafusion.apache.org/user-guide/sql/data_types.html
/// Arrow data types for function input parameters
pub input_types: Vec<DataType>,
/// Arrow data type for function return value
pub output_type: DataType,
/// Function implementation source code and metadata
pub source: FunctionSource,
}

/// Source code and metadata for a user-defined function.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct FunctionSource {
/// Function implementation source code
pub source: Arc<str>,
/// Filename where the function is defined
pub filename: String,
}
4 changes: 3 additions & 1 deletion crates/core/datasets-derived/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ schemars = ["dep:schemars", "datasets-common/schemars", "dep:serde_json"]
[dependencies]
datafusion.workspace = true
datasets-common = { path = "../datasets-common" }
js-runtime = { path = "../js-runtime" }
schemars = { workspace = true, optional = true }
serde.workspace = true
serde_json = {workspace = true, optional = true}
thiserror.workspace = true

[dev-dependencies]
serde_json.workspace = true

59 changes: 11 additions & 48 deletions crates/core/datasets-derived/src/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use std::{collections::BTreeMap, sync::Arc};
use std::collections::BTreeMap;

use datafusion::{
logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF},
sql::parser,
};
use datafusion::sql::parser;
use datasets_common::{
block_num::BlockNum, dataset::Table, dataset_kind_str::DatasetKindStr,
hash_reference::HashReference, table_name::TableName,
};
use js_runtime::{isolate_pool::IsolatePool, js_udf::JsUdf};

use crate::{
DerivedDatasetKind, Manifest,
deps::{DepAlias, DepReference},
function::{Function, FunctionSource},
func_name::FuncName,
function::Function,
manifest::TableInput,
sql::{ResolveTableReferencesError, TableReference, resolve_table_references},
};
Expand Down Expand Up @@ -46,28 +43,13 @@ pub fn dataset(reference: HashReference, manifest: Manifest) -> Result<Dataset,
let tables = sort_tables_by_dependencies(unsorted_tables, &queries)
.map_err(DatasetError::SortTableDependencies)?;

// Convert manifest functions into logical functions
let functions = manifest
.functions
.into_iter()
.map(|(name, f)| Function {
name: name.into_inner(),
input_types: f.input_types.into_iter().map(|dt| dt.0).collect(),
output_type: f.output_type.0,
source: FunctionSource {
source: f.source.source,
filename: f.source.filename,
},
})
.collect();

Ok(Dataset::new(
reference,
manifest.dependencies,
DerivedDatasetKind,
false,
tables,
functions,
manifest.functions,
))
}

Expand All @@ -80,7 +62,7 @@ pub struct Dataset {
kind: DerivedDatasetKind,
dependencies: BTreeMap<DepAlias, DepReference>,
tables: Vec<Table>,
functions: Vec<Function>,
functions: BTreeMap<FuncName, Function>,
finalized_blocks_only: bool,
}

Expand All @@ -92,7 +74,7 @@ impl Dataset {
kind: DerivedDatasetKind,
finalized_blocks_only: bool,
tables: Vec<Table>,
functions: Vec<Function>,
functions: BTreeMap<FuncName, Function>,
) -> Self {
Self {
reference,
Expand All @@ -112,30 +94,11 @@ impl Dataset {
&self.dependencies
}

/// Looks up a user-defined function by name.
///
/// Returns the [`ScalarUDF`] for the function if found. This is used
/// for derived datasets that define custom JavaScript functions.
/// Looks up a function by name.
///
/// Returns `None` if the function name is not found.
pub fn function_by_name(
&self,
schema: String,
name: &str,
isolate_pool: IsolatePool,
) -> Option<ScalarUDF> {
self.functions.iter().find(|f| f.name == name).map(|f| {
AsyncScalarUDF::new(Arc::new(JsUdf::new(
isolate_pool,
schema,
f.source.source.clone(),
f.source.filename.clone().into(),
f.name.clone().into(),
f.input_types.clone(),
f.output_type.clone(),
)))
.into_scalar_udf()
})
/// Returns the [`Function`] definition if found.
pub fn function_by_name(&self, name: &str) -> Option<&Function> {
self.functions.get(name)
}
}

Expand Down
6 changes: 6 additions & 0 deletions crates/core/datasets-derived/src/func_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ impl AsRef<str> for FuncName {
}
}

impl std::borrow::Borrow<str> for FuncName {
fn borrow(&self) -> &str {
&self.0
}
}

impl std::ops::Deref for FuncName {
type Target = str;

Expand Down
Loading
Loading