diff --git a/include/arrow/utils.hpp b/include/arrow/utils.hpp index 684a75b..c09fa22 100644 --- a/include/arrow/utils.hpp +++ b/include/arrow/utils.hpp @@ -97,6 +97,19 @@ arrow::Result> create_table_from_nodes( arrow::Result> create_empty_table( const std::shared_ptr& schema); +/** + * @brief Appends a single ValueRef to an Arrow ArrayBuilder. + * + * Dispatches by ValueType to the correct typed builder (Int32, Int64, etc.). + * Appends null when data is nullptr. + * + * @param value The value to append. + * @param builder The Arrow builder to append into. + * @return OK on success, or an error for unsupported types. + */ +arrow::Status append_value_to_builder(const ValueRef& value, + arrow::ArrayBuilder* builder); + /** * @brief Appends an ArrayRef's contents to an Arrow ListBuilder. * diff --git a/include/common/constants.hpp b/include/common/constants.hpp index c4cb235..58f1c63 100644 --- a/include/common/constants.hpp +++ b/include/common/constants.hpp @@ -11,7 +11,6 @@ namespace tundradb { /// renames safe. namespace field_names { inline constexpr std::string_view kId = "id"; -inline constexpr std::string_view kEdgeId = "_edge_id"; inline constexpr std::string_view kSourceId = "source_id"; inline constexpr std::string_view kTargetId = "target_id"; inline constexpr std::string_view kCreatedTs = "created_ts"; @@ -22,11 +21,6 @@ namespace arena_flags { inline constexpr uint32_t kMarkedForDeletion = 0x1; } // namespace arena_flags -/// Prefix for synthetic shadow schemas that wrap edge types. -namespace schema { -inline constexpr std::string_view kEdgeShadowPrefix = "__edge__"; -} // namespace schema - /// FNV-1a 32-bit hash parameters (used by compute_tag). namespace hash { inline constexpr uint32_t kFnv1aOffsetBasis = 2166136261u; diff --git a/include/common/utils.hpp b/include/common/utils.hpp index 46ec2b4..c329139 100644 --- a/include/common/utils.hpp +++ b/include/common/utils.hpp @@ -254,85 +254,12 @@ static arrow::Result> create_table( for (int i = 0; i < schema->num_fields(); i++) { const auto& field = schema->field(i); - auto field_result = view.get_value_ptr(field); // ✅ Use NodeView! - if (!field_result.ok()) { - ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); - } else { - const auto value_ptr = field_result.ValueOrDie(); - if (value_ptr == nullptr) { - ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); - } else { - switch (field->type()) { - case ValueType::INT32: { - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case ValueType::INT64: { - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case ValueType::FLOAT: { - // return Value{*reinterpret_cast(ptr)}; - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case ValueType::DOUBLE: { - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case ValueType::BOOL: { - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(*reinterpret_cast(value_ptr))); - break; - } - case ValueType::STRING: { - auto str_ref = *reinterpret_cast(value_ptr); - - ARROW_RETURN_NOT_OK( - dynamic_cast(builders[i].get()) - ->Append(str_ref.to_string())); - break; - } - case ValueType::ARRAY: { - const auto& arr_ref = - *reinterpret_cast(value_ptr); - auto* list_builder = - dynamic_cast(builders[i].get()); - if (!list_builder) { - return arrow::Status::Invalid( - "Expected ListBuilder for array field: ", field->name()); - } - ARROW_RETURN_NOT_OK( - append_array_to_list_builder(arr_ref, list_builder)); - break; - } - case ValueType::MAP: { - const auto& map_ref = *reinterpret_cast(value_ptr); - auto* map_builder = - dynamic_cast(builders[i].get()); - if (!map_builder) { - return arrow::Status::Invalid( - "Expected MapBuilder for MAP field: ", field->name()); - } - ARROW_RETURN_NOT_OK( - append_map_to_map_builder(map_ref, map_builder)); - break; - } - default: - return arrow::Status::NotImplemented("Unsupported type: ", - to_string(field->type())); - } - } + auto field_result = view.get_value_ptr(field); + ValueRef vr; + if (field_result.ok() && field_result.ValueOrDie() != nullptr) { + vr = ValueRef(field_result.ValueOrDie(), field->type()); } + ARROW_RETURN_NOT_OK(append_value_to_builder(vr, builders[i].get())); } nodes_in_current_chunk++; diff --git a/include/main/database.hpp b/include/main/database.hpp index cfa024c..8db834a 100644 --- a/include/main/database.hpp +++ b/include/main/database.hpp @@ -183,8 +183,6 @@ class Database { * are skipped. Returns an error if the same alias is bound to two different * schemas. */ - static arrow::Result> - resolve_alias_map(const Query &query); }; } // namespace tundradb diff --git a/include/query/execution.hpp b/include/query/execution.hpp index eeeaa54..ef432e0 100644 --- a/include/query/execution.hpp +++ b/include/query/execution.hpp @@ -92,44 +92,77 @@ class ConnectionPool { size_t size() const { return next_index_; } }; +/// Distinguishes node aliases from edge aliases in the unified alias map. +enum class AliasKind { Node, Edge }; + +/// A registered alias entry: the concrete schema/edge-type name and its kind. +struct AliasEntry { + std::string schema_name; + AliasKind kind; +}; + /** * @brief Manages schema resolution and aliases for query execution * * Responsibilities: - * - Map aliases (e.g., "u") to schema names (e.g., "User") + * - Map aliases (e.g., "u") to schema names (e.g., "User") or edge types * - Resolve SchemaRef objects to concrete schema names - * - Validate schema references + * - Validate schema references and detect alias conflicts */ class SchemaContext { private: - std::unordered_map aliases_; - std::shared_ptr schema_registry_; + std::unordered_map aliases_; public: - explicit SchemaContext(std::shared_ptr registry) - : schema_registry_(std::move(registry)) {} + SchemaContext() = default; + + /** + * @brief Registers a node or edge alias. + * + * Idempotent when the alias already maps to the same name+kind; returns + * an error if the alias is already bound to a different schema or kind. + * + * @param alias The alias string (e.g. "u", "e"). + * @param schema_name The concrete schema name or edge type (e.g. "User", + * "WORKS_AT"). + * @param kind Whether this is a Node or Edge alias. + * @return The schema_name on success, or an error on conflict. + */ + arrow::Result register_alias(const std::string& alias, + const std::string& schema_name, + AliasKind kind); /** - * @brief Registers a schema alias (e.g. "u" -> "User"). + * @brief Registers a schema alias from a SchemaRef. * - * @param schema_ref The schema reference containing alias and schema name. - * @return The resolved concrete schema name, or an error if unknown. + * For declarations (alias:Schema): calls register_alias with Node kind. + * For bare references (alias only): delegates to resolve(). */ arrow::Result register_schema(const SchemaRef& schema_ref); /** - * @brief Resolves a schema reference to its concrete schema name. + * @brief Resolves a schema reference to its entry. * * @param schema_ref The reference to resolve. - * @return The concrete schema name, or an error if not registered. + * @return The AliasEntry, or an error if not registered. */ - arrow::Result resolve(const SchemaRef& schema_ref) const; + arrow::Result resolve(const SchemaRef& schema_ref) const; - /** @brief Returns the underlying SchemaRegistry. */ - std::shared_ptr registry() const { return schema_registry_; } + /** + * @brief Resolves an alias string to its entry. + * + * @param alias The alias to resolve. + * @return The AliasEntry, or an error if not registered. + */ + arrow::Result resolve(const std::string& alias) const; - /** @brief Returns all registered alias->schema mappings. */ - const std::unordered_map& get_aliases() const { + /** @brief Returns true if the alias is registered. */ + bool contains(const std::string& alias) const { + return aliases_.contains(alias); + } + + /** @brief Returns all registered alias entries. */ + const std::unordered_map& get_aliases() const { return aliases_; } }; @@ -319,13 +352,11 @@ class FieldIndexer { * schema. * * @param schema_ref The schema reference (alias used as prefix). - * @param resolved_schema The concrete schema name. - * @param registry The schema registry for field lookup. + * @param schema The resolved Schema whose fields will be indexed. * @return True if names were computed (false if already done). */ arrow::Result compute_fq_names(const SchemaRef& schema_ref, - const std::string& resolved_schema, - SchemaRegistry* registry); + const Schema& schema); arrow::Result compute_fq_names_from_fields( const std::string& alias, @@ -410,7 +441,7 @@ class FieldIndexer { * - Tables: Arrow table storage */ struct QueryState { - SchemaContext schemas; ///< Schema resolution and aliases. + SchemaContext schemas; ///< Alias resolution (unified node + edge). GraphState graph; ///< Graph topology (IDs, connections). FieldIndexer fields; ///< Field indexing for row operations. @@ -419,53 +450,41 @@ struct QueryState { SchemaRef from; ///< Source schema from the FROM clause. std::vector traversals; ///< Traverse clauses in query order. - std::unordered_map - edge_aliases; ///< edge alias -> edge type - std::shared_ptr node_manager; ///< Node storage. - std::shared_ptr edge_store; ///< Edge storage. + std::shared_ptr schema_registry; ///< Node schema registry. + std::shared_ptr node_manager; ///< Node storage. + std::shared_ptr edge_store; ///< Edge storage. + std::unique_ptr temporal_context; ///< Temporal snapshot (nullptr = current). /** @brief Constructs a QueryState bound to the given schema registry. */ explicit QueryState(std::shared_ptr registry); - /** @brief Registers a schema alias. @see SchemaContext::register_schema. */ + /** @brief Registers a node alias. @see SchemaContext::register_schema. */ arrow::Result register_schema(const SchemaRef& ref) { return schemas.register_schema(ref); } - /** - * Records a mapping from an edge alias to its raw edge type name. - * - * This is separate from the schema alias (which maps the alias to the - * shadow schema name like "__edge__WORKS_AT"). The raw edge type is - * needed later by the traversal engine to query EdgeStore by type. - * - * @param alias The edge variable (e.g. "e"). - * @param edge_type The raw edge type (e.g. "WORKS_AT"). - * @return true, or Invalid if the alias is empty or already bound to a - * different type. - */ - arrow::Result register_edge_alias(const std::string& alias, - const std::string& edge_type) { - if (alias.empty()) { - return arrow::Status::Invalid("Edge alias cannot be empty"); - } - if (auto [it, inserted] = edge_aliases.emplace(alias, edge_type); - !inserted && it->second != edge_type) { - return arrow::Status::Invalid("Edge alias '", alias, - "' is already bound to edge type '", - it->second, "'"); - } - return true; + /** @brief Registers an edge alias (alias -> edge_type). */ + arrow::Result register_edge_alias(const std::string& alias, + const std::string& edge_type) { + return schemas.register_alias(alias, edge_type, AliasKind::Edge); } - /** @brief Resolves a schema alias to its concrete name. */ + /** @brief Resolves an alias to its schema name. */ arrow::Result resolve_schema(const SchemaRef& ref) const { - return schemas.resolve(ref); + ARROW_ASSIGN_OR_RAISE(auto entry, schemas.resolve(ref)); + return entry.schema_name; } + /** + * @brief Resolves an alias and fetches the corresponding Schema object. + * Dispatches to SchemaRegistry for nodes, EdgeStore for edges. + */ + arrow::Result> get_schema_for_alias( + const std::string& alias) const; + /** @brief Returns the mutable ID set for the given schema. */ llvm::DenseSet& get_ids(const SchemaRef& schema_ref) { return graph.ids(schema_ref.value()); @@ -481,14 +500,13 @@ struct QueryState { return graph.has_outgoing(ref, node_id); } - /** @brief Computes FQ field names for the given schema reference. */ - arrow::Result compute_fully_qualified_names( - const SchemaRef& ref, const std::string& resolved_schema) { - return fields.compute_fq_names(ref, resolved_schema, - schemas.registry().get()); + /** @brief Computes FQ field names for the given alias. */ + arrow::Result compute_fully_qualified_names(const SchemaRef& ref, + const Schema& schema) { + return fields.compute_fq_names(ref, schema); } - /** @overload Resolves the schema name automatically. */ + /** @overload Resolves the schema automatically via alias kind. */ arrow::Result compute_fully_qualified_names(const SchemaRef& ref); /** @brief Removes a node from the graph. */ @@ -496,13 +514,13 @@ struct QueryState { graph.remove_node(node_id, ref); } - /** @brief Returns the schema registry. */ - std::shared_ptr schema_registry() const { - return schemas.registry(); + /** @brief Returns true if the alias is registered. */ + bool has_alias(const std::string& alias) const { + return schemas.contains(alias); } - /** @brief Returns all alias->schema mappings. */ - const std::unordered_map& aliases() const { + /** @brief Returns all registered alias entries. */ + const std::unordered_map& aliases() const { return schemas.get_aliases(); } @@ -685,7 +703,7 @@ arrow::Result> inline_where( * @param query_state The execution state to populate. * @return OK on success, or an error. */ -arrow::Status prepare_query(Query& query, QueryState& query_state); +arrow::Status prepare_query(const Query& query, QueryState& query_state); } // namespace tundradb diff --git a/include/query/query.hpp b/include/query/query.hpp index f93b7fa..db7bcae 100644 --- a/include/query/query.hpp +++ b/include/query/query.hpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -96,9 +97,9 @@ enum class CompareOp { * field_=Field{name="since", type=INT64, index=4} * * Resolution is performed by `ComparisonExpr::resolve_field_ref`, which - * maps the variable through the alias table to a schema name, then looks - * up the field by name. For edge aliases the schema is the shadow schema - * (e.g. "__edge__WORKS_AT"). + * maps the variable through the alias table to an AliasEntry (name + kind), + * then looks up the field in the appropriate registry (SchemaRegistry for + * nodes, EdgeStore for edges). * * Nested paths (e.g. "u.props.score") are stored in `nested_path_` for * MAP sub-key access during query evaluation. @@ -201,20 +202,15 @@ class WhereExpr { * * Called by `prepare_query` (Phase 3) after all aliases have been * registered. Each `FieldRef` inside this expression tree is resolved - * by looking up its variable in @p aliases to find the schema name, - * then fetching the Field from @p schema_registry. + * by looking up its variable via @p schema_resolver, which dispatches + * to the correct registry (nodes or edges) based on alias kind. * - * For edge aliases the schema name is the shadow schema - * (e.g. "__edge__WORKS_AT"), not the raw edge type. - * - * @param aliases Variable -> schema-name map (node aliases and - * edge shadow aliases). - * @param schema_registry Registry holding all node and shadow schemas. + * @param schema_resolver Maps a variable name (alias) to its Schema. * @return true when all references are resolved, or an error status. */ virtual arrow::Result resolve_field_ref( - const std::unordered_map& aliases, - const SchemaRegistry* schema_registry) = 0; + const std::function>( + const std::string&)>& schema_resolver) = 0; /** @brief Evaluates this expression against a node. */ virtual arrow::Result matches( const std::shared_ptr& node) const = 0; @@ -368,8 +364,8 @@ class ComparisonExpr : public Clause, public WhereExpr { std::set get_all_variables() const override; arrow::Result resolve_field_ref( - const std::unordered_map& aliases, - const SchemaRegistry* schema_registry) override; + const std::function>( + const std::string&)>& schema_resolver) override; }; /** @@ -392,8 +388,8 @@ class LogicalExpr : public Clause, public WhereExpr { void set_inlined(bool inlined) override; arrow::Result resolve_field_ref( - const std::unordered_map& aliases, - const SchemaRegistry* schema_registry) override; + const std::function>( + const std::string&)>& schema_resolver) override; static std::shared_ptr and_expr( std::shared_ptr left, std::shared_ptr right); @@ -510,6 +506,18 @@ class Query { return temporal_snapshot_; } + /** @brief Finds the Traverse clause whose edge alias matches, or nullptr. */ + [[nodiscard]] std::shared_ptr find_traverse( + const std::string& alias) const { + for (const auto& clause : clauses_) { + if (clause->type() != Clause::Type::TRAVERSE) continue; + auto t = std::static_pointer_cast(clause); + if (t->edge_alias().has_value() && t->edge_alias().value() == alias) + return t; + } + return nullptr; + } + static Builder from(const std::string& schema) { return Builder(schema); } /** @brief Fluent builder for constructing Query objects. */ diff --git a/src/arrow/utils.cpp b/src/arrow/utils.cpp index faf698e..ac500af 100644 --- a/src/arrow/utils.cpp +++ b/src/arrow/utils.cpp @@ -233,6 +233,50 @@ arrow::Result array_element_to_value( } } +arrow::Status append_value_to_builder(const ValueRef& value, + arrow::ArrayBuilder* builder) { + if (value.data == nullptr) { + return builder->AppendNull(); + } + switch (value.type) { + case ValueType::INT32: + return static_cast(builder)->Append( + value.as_int32()); + case ValueType::INT64: + return static_cast(builder)->Append( + value.as_int64()); + case ValueType::FLOAT: + return static_cast(builder)->Append( + value.as_float()); + case ValueType::DOUBLE: + return static_cast(builder)->Append( + value.as_double()); + case ValueType::BOOL: + return static_cast(builder)->Append( + value.as_bool()); + case ValueType::STRING: + case ValueType::FIXED_STRING16: + case ValueType::FIXED_STRING32: + case ValueType::FIXED_STRING64: { + const auto& s = value.as_string_ref(); + return static_cast(builder)->Append(s.data(), + s.length()); + } + case ValueType::ARRAY: { + auto* lb = dynamic_cast(builder); + if (!lb) return arrow::Status::Invalid("Expected ListBuilder for ARRAY"); + return append_array_to_list_builder(value.as_array_ref(), lb); + } + case ValueType::MAP: { + auto* mb = dynamic_cast(builder); + if (!mb) return arrow::Status::Invalid("Expected MapBuilder for MAP"); + return append_map_to_map_builder(value.as_map_ref(), mb); + } + default: + return builder->AppendNull(); + } +} + arrow::Status append_array_to_list_builder(const ArrayRef& arr_ref, arrow::ListBuilder* list_builder) { if (arr_ref.is_null()) { @@ -418,74 +462,14 @@ arrow::Result> create_table_from_nodes( } for (const auto& node : nodes) { auto view = node->view(nullptr); - for (int i = 0; i < schema->num_fields(); i++) { auto field = schema->field(i); - const auto& field_name = field->name(); - auto res = view.get_value_ptr(field); - if (res.ok()) { - auto value = res.ValueOrDie(); - if (value) { - if (field->type() == ValueType::ARRAY) { - const auto& arr_ref = *reinterpret_cast(value); - auto* list_builder = - dynamic_cast(builders[i].get()); - if (!list_builder) { - return arrow::Status::Invalid( - "Expected ListBuilder for array field: ", field_name); - } - ARROW_RETURN_NOT_OK( - append_array_to_list_builder(arr_ref, list_builder)); - } else if (field->type() == ValueType::MAP) { - const auto& map_ref = *reinterpret_cast(value); - auto* map_builder = - dynamic_cast(builders[i].get()); - if (!map_builder) { - return arrow::Status::Invalid( - "Expected MapBuilder for MAP field: ", field_name); - } - ARROW_RETURN_NOT_OK( - append_map_to_map_builder(map_ref, map_builder)); - } else { - auto scalar_result = - value_ptr_to_arrow_scalar(value, field->type()); - if (!scalar_result.ok()) { - log_error("Failed to convert value to scalar for field '{}': {}", - field_name, scalar_result.status().ToString()); - return scalar_result.status(); - } - - const auto& scalar = scalar_result.ValueOrDie(); - auto status = builders[i]->AppendScalar(*scalar); - if (!status.ok()) { - log_error("Failed to append scalar for field '{}': {}", - field_name, status.ToString()); - return status; - } - } - } else { - IF_DEBUG_ENABLED { - log_debug("Null value for field '{}', appending null", field_name); - } - auto status = builders[i]->AppendNull(); - if (!status.ok()) { - log_error("Failed to append null for field '{}': {}", field_name, - status.ToString()); - return status; - } - } - } else { - IF_DEBUG_ENABLED { - log_debug("Field '{}' not found in node, appending null", field_name); - } - auto status = builders[i]->AppendNull(); - if (!status.ok()) { - log_error("Failed to append null for field '{}': {}", field_name, - status.ToString()); - return status; - } + ValueRef vr; + if (res.ok() && res.ValueOrDie() != nullptr) { + vr = ValueRef(res.ValueOrDie(), field->type()); } + ARROW_RETURN_NOT_OK(append_value_to_builder(vr, builders[i].get())); } } diff --git a/src/core/edge.cpp b/src/core/edge.cpp index 9e1c19f..d417d3c 100644 --- a/src/core/edge.cpp +++ b/src/core/edge.cpp @@ -8,8 +8,7 @@ namespace tundradb { arrow::Result Edge::get_value( const std::shared_ptr& field) const { - if (field && (field->name() == field_names::kId || - field->name() == field_names::kEdgeId)) { + if (field && field->name() == field_names::kId) { return Value{id_}; } if (field && field->name() == field_names::kSourceId) @@ -30,8 +29,7 @@ arrow::Result Edge::get_value_ptr( if (!field) { return arrow::Status::Invalid("Field is null"); } - if (field->name() == field_names::kId || - field->name() == field_names::kEdgeId) { + if (field->name() == field_names::kId) { return reinterpret_cast(&id_); } if (field->name() == field_names::kSourceId) @@ -90,7 +88,6 @@ arrow::Result EdgeView::get_value_ptr( return edge_->get_value_ptr(field); } if (field && (field->name() == field_names::kId || - field->name() == field_names::kEdgeId || field->name() == field_names::kSourceId || field->name() == field_names::kTargetId || field->name() == field_names::kCreatedTs)) { diff --git a/src/main/database.cpp b/src/main/database.cpp index cecf68c..70ffa9b 100644 --- a/src/main/database.cpp +++ b/src/main/database.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -234,11 +235,8 @@ populate_rows_bfs(int64_t node_id, const SchemaRef& start_schema, return arrow::Status::OK(); } - ARROW_ASSIGN_OR_RAISE( - const auto edge_schema_name, - query_state.resolve_schema(SchemaRef::parse(edge_alias))); ARROW_ASSIGN_OR_RAISE(const auto edge_schema, - query_state.schema_registry()->get(edge_schema_name)); + query_state.get_schema_for_alias(edge_alias)); ARROW_ASSIGN_OR_RAISE(const auto edge_obj, query_state.edge_store->get(conn.edge_id)); row.set_cell_from_edge(idx_it->second, edge_obj, edge_schema->fields(), @@ -582,102 +580,25 @@ arrow::Result> create_table_from_rows( // Create array builders for each field std::vector> builders; - std::vector - field_names; // Cache field names to avoid repeated lookups - for (const auto& field : output_schema->fields()) { ARROW_ASSIGN_OR_RAISE(auto builder, arrow::MakeBuilder(field->type())); builders.push_back(std::move(builder)); - field_names.push_back(field->name()); } - // Pre-allocate builders for better performance + const size_t num_fields = builders.size(); const size_t num_rows = rows->size(); for (auto& builder : builders) { ARROW_RETURN_NOT_OK(builder->Reserve(num_rows)); } - // Populate the builders from each row for (const auto& row : *rows) { - for (size_t i = 0; i < field_names.size(); i++) { - const auto& field_name = field_names[i]; // Use cached field name - - // Optimization: try indexed access first, fallback to string lookup + for (size_t i = 0; i < num_fields; i++) { ValueRef value_ref; - bool has_value = false; - if (i < row->cells.size() && row->cells[i].data != nullptr) { value_ref = row->cells[i]; - has_value = true; - } - - if (has_value) { - // We have a value for this field - append directly without creating - // scalars - arrow::Status append_status; - - switch (value_ref.type) { - case ValueType::INT32: - append_status = static_cast(builders[i].get()) - ->Append(value_ref.as_int32()); - break; - case ValueType::INT64: - append_status = static_cast(builders[i].get()) - ->Append(value_ref.as_int64()); - break; - case ValueType::DOUBLE: - append_status = - static_cast(builders[i].get()) - ->Append(value_ref.as_double()); - break; - case ValueType::STRING: { - const auto& str_ref = value_ref.as_string_ref(); - append_status = - static_cast(builders[i].get()) - ->Append(str_ref.data(), str_ref.length()); - break; - } - case ValueType::BOOL: - append_status = - static_cast(builders[i].get()) - ->Append(value_ref.as_bool()); - break; - case ValueType::ARRAY: { - const auto& arr_ref = value_ref.as_array_ref(); - auto* list_builder = - dynamic_cast(builders[i].get()); - if (!list_builder) { - append_status = arrow::Status::Invalid( - "Expected ListBuilder for field: ", field_name); - break; - } - append_status = append_array_to_list_builder(arr_ref, list_builder); - break; - } - case ValueType::MAP: { - const auto& map_ref = value_ref.as_map_ref(); - auto* map_builder = - dynamic_cast(builders[i].get()); - if (!map_builder) { - append_status = arrow::Status::Invalid( - "Expected MapBuilder for field: ", field_name); - break; - } - append_status = append_map_to_map_builder(map_ref, map_builder); - break; - } - default: - append_status = builders[i]->AppendNull(); - break; - } - - if (append_status.ok()) { - continue; - } } - - // Fall back to NULL if we couldn't append the value - ARROW_RETURN_NOT_OK(builders[i]->AppendNull()); + ARROW_RETURN_NOT_OK( + append_value_to_builder(value_ref, builders[i].get())); } } @@ -720,13 +641,11 @@ arrow::Result> Database::query( } query_state.node_manager = this->node_manager_; query_state.edge_store = this->edge_store_; - query_state.from = query.from(); { IF_DEBUG_ENABLED { log_debug("processing 'from' {}", query.from().toString()); } - // Precompute tag for FROM schema (alias-based hash) query_state.from = query.from(); query_state.from.set_tag(compute_tag(query_state.from)); ARROW_ASSIGN_OR_RAISE(auto source_schema, @@ -739,8 +658,7 @@ arrow::Result> Database::query( auto source_table, this->get_table(source_schema, query_state.temporal_context.get())); ARROW_RETURN_NOT_OK(query_state.update_table(source_table, query.from())); - if (auto res = query_state.compute_fully_qualified_names(query.from(), - source_schema); + if (auto res = query_state.compute_fully_qualified_names(query.from()); !res.ok()) { return res.status(); } @@ -754,8 +672,7 @@ arrow::Result> Database::query( "Preparing query: populating aliases, traversals, and resolving " "field references"); } - auto preparation_result = - prepare_query(const_cast(query), query_state); + auto preparation_result = prepare_query(query, query_state); if (!preparation_result.ok()) { log_error("Failed to prepare query: {}", preparation_result.ToString()); return preparation_result; @@ -784,8 +701,7 @@ arrow::Result> Database::query( // Also precompute fully-qualified field names per alias used in the query std::vector> post_where; for (auto i = 0; i < query.clauses().size(); ++i) { - auto clause = query.clauses()[i]; - switch (clause->type()) { + switch (auto clause = query.clauses()[i]; clause->type()) { case Clause::Type::WHERE: { auto where = std::dynamic_pointer_cast(clause); if (where->inlined()) { @@ -809,11 +725,7 @@ arrow::Result> Database::query( std::unordered_map> new_front_ids; std::string variable = *variables.begin(); if (!query_state.tables.contains(variable)) { - const bool known_node_alias = - query_state.aliases().contains(variable); - const bool known_edge_alias = - query_state.edge_aliases.contains(variable); - if (!known_node_alias && !known_edge_alias) { + if (!query_state.aliases().contains(variable)) { return arrow::Status::Invalid("Unknown variable '{}'", variable); } // Alias is valid but not materialized as a table at this point @@ -896,10 +808,10 @@ arrow::Result> Database::query( query_state.resolve_schema(traverse->target())); // Fully-qualified field names should also be precomputed during // preparation - ARROW_RETURN_NOT_OK(query_state.compute_fully_qualified_names( - traverse->source(), source_schema)); - ARROW_RETURN_NOT_OK(query_state.compute_fully_qualified_names( - traverse->target(), target_schema)); + ARROW_RETURN_NOT_OK( + query_state.compute_fully_qualified_names(traverse->source())); + ARROW_RETURN_NOT_OK( + query_state.compute_fully_qualified_names(traverse->target())); if (traverse->edge_alias().has_value()) { ARROW_RETURN_NOT_OK(query_state.compute_fully_qualified_names( SchemaRef::parse(traverse->edge_alias().value()))); @@ -967,8 +879,8 @@ arrow::Result> Database::query( auto node_result = node_manager_->get_node(target_schema, target_id); if (node_result.ok()) { - const auto target_node = node_result.ValueOrDie(); - if (target_node->schema_name == target_schema) { + if (const auto target_node = node_result.ValueOrDie(); + target_node->schema_name == target_schema) { // Then apply all WHERE clauses with AND logic bool passes_all_filters = true; // Multiple conditions - could optimize by creating a @@ -1251,25 +1163,21 @@ arrow::Result Database::update_by_match(const UpdateQuery& uq) { UpdateResult result; const auto& match_query = uq.match_query().value(); - // 1. Resolve alias -> schema mapping (declarations only, with validation) - ARROW_ASSIGN_OR_RAISE(auto alias_to_schema, resolve_alias_map(match_query)); - std::unordered_map edge_alias_to_type; + // 1. Build alias -> schema from node declarations + std::unordered_map alias_to_schema; + if (match_query.from().is_declaration()) + alias_to_schema[match_query.from().value()] = match_query.from().schema(); for (const auto& clause : match_query.clauses()) { if (clause->type() != Clause::Type::TRAVERSE) continue; - const auto t = std::static_pointer_cast(clause); - if (t->edge_alias().has_value()) { - edge_alias_to_type.emplace(t->edge_alias().value(), t->edge_type()); - } + auto t = std::static_pointer_cast(clause); + if (t->source().is_declaration()) + alias_to_schema[t->source().value()] = t->source().schema(); + if (t->target().is_declaration()) + alias_to_schema[t->target().value()] = t->target().schema(); } - // 2. Group SET assignments by alias: { alias -> (schema, [(Field,Value)]) } - struct AliasUpdate { - std::string schema_name; - std::vector fields; - }; - std::unordered_map grouped; - std::unordered_map grouped_edge; - + // 2. Group SET assignments by alias: { alias -> [FieldUpdate] } + std::unordered_map> updates_by_alias; for (const auto& a : uq.assignments()) { const auto parsed = FieldRef::from_string(a.field_name); if (parsed.variable().empty()) { @@ -1277,63 +1185,52 @@ arrow::Result Database::update_by_match(const UpdateQuery& uq) { "SET field '", a.field_name, "' must be alias-qualified (e.g. u.age) in a MATCH-based update"); } - const std::string alias = parsed.variable(); - const std::string bare_field = parsed.field_name(); + const std::string& alias = parsed.variable(); + const std::string& bare_field = parsed.field_name(); - if (const auto edge_it = edge_alias_to_type.find(alias); - edge_it != edge_alias_to_type.end()) { - const auto edge_schema = edge_store_->get_edge_schema(edge_it->second); + std::shared_ptr field; + if (auto trav = match_query.find_traverse(alias); trav != nullptr) { + auto edge_schema = edge_store_->get_edge_schema(trav->edge_type()); if (!edge_schema) { - return arrow::Status::KeyError("Edge schema '", edge_it->second, + return arrow::Status::KeyError("Edge schema '", trav->edge_type(), "' not found"); } - auto field = edge_schema->get_field(bare_field); + field = edge_schema->get_field(bare_field); if (!field) { return arrow::Status::Invalid("Field '", bare_field, "' not found in edge schema '", - edge_it->second, "'"); + trav->edge_type(), "'"); + } + } else { + auto it = alias_to_schema.find(alias); + if (it == alias_to_schema.end()) { + return arrow::Status::Invalid("Alias '", alias, + "' not found in MATCH query"); + } + ARROW_ASSIGN_OR_RAISE(auto schema, schema_registry_->get(it->second)); + field = schema->get_field(bare_field); + if (!field) { + return arrow::Status::Invalid( + "Field '", bare_field, "' not found in schema '", it->second, "'"); } - auto& entry = grouped_edge[alias]; - if (entry.schema_name.empty()) entry.schema_name = edge_it->second; - entry.fields.push_back( - FieldUpdate{field, a.value, uq.update_type(), parsed.nested_path()}); - continue; - } - - auto it = alias_to_schema.find(alias); - if (it == alias_to_schema.end()) { - return arrow::Status::Invalid("Alias '", alias, - "' not found in MATCH query"); - } - - auto schema_result = schema_registry_->get(it->second); - if (!schema_result.ok()) { - return arrow::Status::KeyError("Schema '", it->second, "' not found"); - } - const auto& schema = schema_result.ValueOrDie(); - auto field = schema->get_field(bare_field); - if (!field) { - return arrow::Status::Invalid("Field '", bare_field, - "' not found in schema '", it->second, "'"); } - - auto& entry = grouped[alias]; - if (entry.schema_name.empty()) entry.schema_name = it->second; - entry.fields.push_back( + updates_by_alias[alias].push_back( FieldUpdate{field, a.value, uq.update_type(), parsed.nested_path()}); } - // 3. Build ID-only SELECT: we only need "u.id", "c.id", etc. - std::vector id_columns; - id_columns.reserve(grouped.size() + grouped_edge.size()); - for (const auto& alias : grouped | std::views::keys) { - id_columns.push_back(alias + ".id"); - } - for (const auto& alias : grouped_edge | std::views::keys) { - id_columns.push_back(alias + "._edge_id"); + // 3. Build SELECT with node IDs needed for updates and edge lookups. + std::set id_column_set; + for (const auto& [alias, _] : updates_by_alias) { + if (auto trav = match_query.find_traverse(alias)) { + id_column_set.insert(trav->source().value() + ".id"); + id_column_set.insert(trav->target().value() + ".id"); + } else { + id_column_set.insert(alias + ".id"); + } } Query id_query(match_query.from(), match_query.clauses(), - std::make_shared(std::vector( + id_column_set.begin(), id_column_set.end())), match_query.inline_where(), match_query.execution_config(), match_query.temporal_snapshot()); @@ -1344,42 +1241,46 @@ arrow::Result Database::update_by_match(const UpdateQuery& uq) { return result; } - // 5. Apply updates per alias group - for (const auto& [alias, info] : grouped) { - auto id_column = table->GetColumnByName(alias + ".id"); - if (!id_column) { - return arrow::Status::Invalid("Could not find '", alias, - ".id' column in query results"); - } - apply_updates(info.schema_name, id_column, info.fields, uq.update_type(), - result); - } - for (const auto& [alias, info] : grouped_edge) { - auto id_column = table->GetColumnByName(alias + "._edge_id"); - if (!id_column) { - return arrow::Status::Invalid("Could not find '", alias, - "._edge_id' column in query results"); - } - for (int ci = 0; ci < id_column->num_chunks(); ci++) { - const auto chunk = - std::static_pointer_cast(id_column->chunk(ci)); - for (int64_t i = 0; i < chunk->length(); i++) { - if (chunk->IsNull(i)) continue; - const int64_t edge_id = chunk->Value(i); - auto edge_res = edge_store_->get(edge_id); - if (!edge_res.ok()) { - result.failed_count++; - result.errors.push_back("edge(" + std::to_string(edge_id) + - "): " + edge_res.status().ToString()); - continue; - } - if (auto upd = edge_res.ValueOrDie()->update_fields(info.fields); - !upd.ok()) { - result.failed_count++; - result.errors.push_back("edge(" + std::to_string(edge_id) + - "): " + upd.status().ToString()); - } else { - result.updated_count++; + // 5. Apply updates per alias + for (const auto& [alias, fields] : updates_by_alias) { + if (auto trav = match_query.find_traverse(alias); !trav) { + auto id_column = table->GetColumnByName(alias + ".id"); + if (!id_column) { + return arrow::Status::Invalid("Could not find '", alias, + ".id' column in query results"); + } + apply_updates(alias_to_schema.at(alias), id_column, fields, + uq.update_type(), result); + } else { + auto src_col = table->GetColumnByName(trav->source().value() + ".id"); + auto tgt_col = table->GetColumnByName(trav->target().value() + ".id"); + if (!src_col || !tgt_col) { + return arrow::Status::Invalid( + "Could not find source/target ID columns for edge alias '", alias, + "'"); + } + llvm::DenseSet updated_edge_ids; + for (int ci = 0; ci < src_col->num_chunks(); ci++) { + const auto src_chunk = + std::static_pointer_cast(src_col->chunk(ci)); + const auto tgt_chunk = + std::static_pointer_cast(tgt_col->chunk(ci)); + for (int64_t i = 0; i < src_chunk->length(); i++) { + if (src_chunk->IsNull(i) || tgt_chunk->IsNull(i)) continue; + auto edges_res = edge_store_->get_outgoing_edges(src_chunk->Value(i), + trav->edge_type()); + if (!edges_res.ok()) continue; + for (const auto& edge : edges_res.ValueOrDie()) { + if (edge->get_target_id() != tgt_chunk->Value(i)) continue; + if (!updated_edge_ids.insert(edge->get_id()).second) continue; + if (auto upd = edge->update_fields(fields); !upd.ok()) { + result.failed_count++; + result.errors.push_back("edge(" + std::to_string(edge->get_id()) + + "): " + upd.status().ToString()); + } else { + result.updated_count++; + } + } } } } @@ -1416,37 +1317,4 @@ void Database::apply_updates( } } -// --------------------------------------------------------------------------- -// resolve_alias_map - build alias->schema from declarations, reject conflicts -// --------------------------------------------------------------------------- -arrow::Result> -Database::resolve_alias_map(const Query& query) { - std::unordered_map map; - - auto register_ref = [&](const SchemaRef& ref) -> arrow::Status { - if (!ref.is_declaration()) return arrow::Status::OK(); - const auto& alias = ref.value(); - const auto& schema = ref.schema(); - if (auto [it, inserted] = map.emplace(alias, schema); - !inserted && it->second != schema) { - return arrow::Status::Invalid("Alias '", alias, "' bound to '", - it->second, "' cannot be re-bound to '", - schema, "'"); - } - return arrow::Status::OK(); - }; - - ARROW_RETURN_NOT_OK(register_ref(query.from())); - - for (const auto& clause : query.clauses()) { - if (clause->type() == Clause::Type::TRAVERSE) { - const auto t = std::static_pointer_cast(clause); - ARROW_RETURN_NOT_OK(register_ref(t->source())); - ARROW_RETURN_NOT_OK(register_ref(t->target())); - } - } - - return map; -} - } // namespace tundradb diff --git a/src/query/execution.cpp b/src/query/execution.cpp index f1829d1..c735a1a 100644 --- a/src/query/execution.cpp +++ b/src/query/execution.cpp @@ -5,7 +5,6 @@ #include "arrow/map_union_types.hpp" #include "arrow/utils.hpp" -#include "common/constants.hpp" #include "common/logger.hpp" #include "common/utils.hpp" #include "core/edge_store.hpp" @@ -14,75 +13,6 @@ namespace tundradb { namespace { -/** - * Returns the canonical shadow-schema name for an edge type. - * - * Edge types like "WORKS_AT" are not node schemas, so they cannot be - * stored directly in the SchemaRegistry. Instead we create a synthetic - * "shadow" schema (prefixed with "__edge__") that merges the structural - * edge columns (_edge_id, source_id, target_id, created_ts) with the - * user-defined edge properties. This name is the key under which that - * shadow schema is registered. - */ -std::string edge_shadow_schema_name(const std::string& edge_type) { - return std::string(schema::kEdgeShadowPrefix) + edge_type; -} - -/** - * Lazily creates and registers a shadow Arrow schema for the given edge type. - * - * On first call for a given edge_type, this function: - * 1. Looks up the edge schema from `EdgeStore` (user-defined fields like - * "since", "role"). - * 2. Prepends the four structural edge columns (_edge_id, source_id, - * target_id, created_ts). - * 3. Registers the combined Arrow schema in the SchemaRegistry under the - * shadow name (e.g. "__edge__WORKS_AT"). - * - * Subsequent calls for the same edge_type return the cached shadow name. - * - * The shadow schema allows the query engine to resolve edge field - * references (e.g. `e.since`) through the same SchemaRegistry / - * FieldRef resolution pipeline used for node fields. - * - * @param query_state Mutable query state (provides schema registry and edge - * store). - * @param edge_type The edge type name (e.g. "WORKS_AT"). - * @return The shadow schema name, or an error if the edge schema is missing. - */ -arrow::Result ensure_edge_shadow_schema( - QueryState& query_state, const std::string& edge_type) { - const std::string shadow = edge_shadow_schema_name(edge_type); - auto registry = query_state.schema_registry(); - if (registry->get(shadow).ok()) { - return shadow; - } - if (!query_state.edge_store) { - return arrow::Status::Invalid("Edge store is not available in query state"); - } - auto edge_schema = query_state.edge_store->get_edge_schema(edge_type); - if (!edge_schema) { - return arrow::Status::KeyError("Edge schema '", edge_type, "' not found"); - } - - std::vector> fields{ - arrow::field(std::string(field_names::kEdgeId), arrow::int64()), - arrow::field(std::string(field_names::kSourceId), arrow::int64()), - arrow::field(std::string(field_names::kTargetId), arrow::int64()), - arrow::field(std::string(field_names::kCreatedTs), arrow::int64())}; - std::unordered_set seen{std::string(field_names::kEdgeId), - std::string(field_names::kSourceId), - std::string(field_names::kTargetId), - std::string(field_names::kCreatedTs)}; - for (const auto& f : edge_schema->arrow()->fields()) { - if (seen.insert(f->name()).second) { - fields.push_back(f); - } - } - ARROW_RETURN_NOT_OK(registry->create(shadow, arrow::schema(fields))); - return shadow; -} - /** * @brief Project one key from a MAP column into a flat Arrow array. * @@ -279,60 +209,61 @@ arrow::Result> enrich_nested_select_fields( // SchemaContext implementation -arrow::Result SchemaContext::register_schema( - const SchemaRef& schema_ref) { - if (aliases_.contains(schema_ref.value()) && schema_ref.is_declaration()) { - IF_DEBUG_ENABLED { - log_debug("Schema alias '{}' already assigned to '{}'", - schema_ref.value(), aliases_.at(schema_ref.value())); +arrow::Result SchemaContext::register_alias( + const std::string& alias, const std::string& schema_name, AliasKind kind) { + auto [it, inserted] = aliases_.emplace(alias, AliasEntry{schema_name, kind}); + if (!inserted) { + if (it->second.schema_name != schema_name || it->second.kind != kind) { + return arrow::Status::Invalid("Alias '", alias, "' already bound to '", + it->second.schema_name, + "', cannot re-bind to '", schema_name, "'"); } - return aliases_[schema_ref.value()]; } + return schema_name; +} - if (schema_ref.is_declaration()) { - aliases_[schema_ref.value()] = schema_ref.schema(); - return schema_ref.schema(); +arrow::Result SchemaContext::register_schema( + const SchemaRef& schema_ref) { + if (!schema_ref.is_declaration()) { + ARROW_ASSIGN_OR_RAISE(auto entry, resolve(schema_ref)); + return entry.schema_name; } - - return aliases_[schema_ref.value()]; + return register_alias(schema_ref.value(), schema_ref.schema(), + AliasKind::Node); } -arrow::Result SchemaContext::resolve( +arrow::Result SchemaContext::resolve( const SchemaRef& schema_ref) const { if (schema_ref.is_declaration()) { - return schema_ref.schema(); + return AliasEntry{schema_ref.schema(), AliasKind::Node}; } + return resolve(schema_ref.value()); +} - auto it = aliases_.find(schema_ref.value()); +arrow::Result SchemaContext::resolve( + const std::string& alias) const { + auto it = aliases_.find(alias); if (it == aliases_.end()) { - return arrow::Status::KeyError("No alias for '{}'", schema_ref.value()); + return arrow::Status::KeyError("No alias for '{}'", alias); } - return it->second; } // FieldIndexer implementation -arrow::Result FieldIndexer::compute_fq_names( - const SchemaRef& schema_ref, const std::string& resolved_schema, - SchemaRegistry* registry) { +arrow::Result FieldIndexer::compute_fq_names(const SchemaRef& schema_ref, + const Schema& schema) { const std::string& alias = schema_ref.value(); if (fq_field_names_.contains(alias)) { return false; // Already computed } - auto schema_res = registry->get(resolved_schema); - if (!schema_res.ok()) { - return schema_res.status(); - } - - const auto& schema = schema_res.ValueOrDie(); std::vector names; std::vector indices; - names.reserve(schema->num_fields()); - indices.reserve(schema->num_fields()); + names.reserve(schema.num_fields()); + indices.reserve(schema.num_fields()); - for (const auto& field : schema->fields()) { + for (const auto& field : schema.fields()) { std::string fq_name = alias + "." + field->name(); int field_id = next_field_id_.fetch_add(1); @@ -352,7 +283,24 @@ arrow::Result FieldIndexer::compute_fq_names( // QueryState implementation QueryState::QueryState(std::shared_ptr registry) - : schemas(std::move(registry)) {} + : schema_registry(std::move(registry)) {} + +arrow::Result> QueryState::get_schema_for_alias( + const std::string& alias) const { + ARROW_ASSIGN_OR_RAISE(auto entry, schemas.resolve(alias)); + if (entry.kind == AliasKind::Edge) { + if (!edge_store) { + return arrow::Status::Invalid("Edge store not available"); + } + auto schema = edge_store->get_edge_schema(entry.schema_name); + if (!schema) { + return arrow::Status::KeyError("Edge schema '", entry.schema_name, + "' not found"); + } + return schema; + } + return schema_registry->get(entry.schema_name); +} void QueryState::reserve_capacity(const Query& query) { // Estimate schema count from FROM + TRAVERSE clauses @@ -369,13 +317,8 @@ void QueryState::reserve_capacity(const Query& query) { arrow::Result QueryState::compute_fully_qualified_names( const SchemaRef& schema_ref) { - const auto& aliases_map = schemas.get_aliases(); - const auto it = aliases_map.find(schema_ref.value()); - if (it == aliases_map.end()) { - return arrow::Status::KeyError("keyset does not contain alias '{}'", - schema_ref.value()); - } - return compute_fully_qualified_names(schema_ref, it->second); + ARROW_ASSIGN_OR_RAISE(auto schema, get_schema_for_alias(schema_ref.value())); + return fields.compute_fq_names(schema_ref, *schema); } arrow::Result QueryState::update_table( @@ -406,8 +349,9 @@ std::string QueryState::ToString() const { } ss << " Aliases (" << schemas.get_aliases().size() << "):\n"; - for (const auto& [alias, schema_name] : schemas.get_aliases()) { - ss << " - " << alias << " -> " << schema_name << "\n"; + for (const auto& [alias, entry] : schemas.get_aliases()) { + ss << " - " << alias << " -> " << entry.schema_name + << (entry.kind == AliasKind::Edge ? " [edge]" : " [node]") << "\n"; } ss << " Connections (Outgoing) (" << graph.outgoing().size() @@ -550,8 +494,7 @@ arrow::Result> build_denormalized_schema( log_debug("Adding fields from FROM schema '{}'", from_schema); } - auto schema_result = - query_state.schema_registry()->get(query_state.aliases().at(from_schema)); + auto schema_result = query_state.get_schema_for_alias(from_schema); if (!schema_result.ok()) { return schema_result.status(); } @@ -584,8 +527,7 @@ arrow::Result> build_denormalized_schema( log_debug("Adding fields from schema '{}'", schema_ref.value()); } - schema_result = query_state.schema_registry()->get( - query_state.aliases().at(schema_ref.value())); + schema_result = query_state.get_schema_for_alias(schema_ref.value()); if (!schema_result.ok()) { return schema_result.status(); } @@ -769,22 +711,15 @@ arrow::Result> inline_where( * Phase 1 — FROM: registers the root schema alias (e.g. "u" -> "User"). * * Phase 2 — TRAVERSE: for each traversal clause: - * - Registers source / target node aliases. - * - If an edge alias is present (e.g. `[e:WORKS_AT]`), creates a - * shadow schema via `ensure_edge_shadow_schema` and registers the - * alias (e.g. "e" -> "__edge__WORKS_AT") so that edge fields can - * be resolved through the standard schema registry. - * - Records the edge alias -> raw edge type mapping separately in - * `query_state.edge_aliases` (used later by the traversal engine - * to look up edges in EdgeStore). + * - Registers source / target node aliases (AliasKind::Node). + * - If an edge alias is present (e.g. `[e:WORKS_AT]`), registers + * it with AliasKind::Edge (e.g. "e" -> "WORKS_AT"). * - Computes BFS tags for source/target. * * Phase 3 — WHERE: resolves every `FieldRef` inside `ComparisonExpr` - * nodes. Each symbolic reference like "e.since" is looked up in the - * alias map ("e" -> "__edge__WORKS_AT"), then the field "since" is - * resolved from that schema's `Field` object and bound to the - * `FieldRef`. After this phase, every `FieldRef::is_resolved()` - * returns true. + * nodes via get_schema_for_alias(), which dispatches to SchemaRegistry + * (nodes) or EdgeStore (edges) based on alias kind. After this phase + * every `FieldRef::is_resolved()` returns true. * * @param query The query to prepare (may be mutated: tags set on * traversal sources/targets). @@ -792,7 +727,7 @@ arrow::Result> inline_where( * and the schema registry. * @return OK on success, or a KeyError / Invalid status if resolution fails. */ -arrow::Status prepare_query(Query& query, QueryState& query_state) { +arrow::Status prepare_query(const Query& query, QueryState& query_state) { // Phase 1: Process FROM clause to populate aliases { ARROW_ASSIGN_OR_RAISE(auto from_schema, @@ -818,15 +753,10 @@ arrow::Status prepare_query(Query& query, QueryState& query_state) { } if (traverse->edge_alias().has_value()) { ARROW_ASSIGN_OR_RAISE( - auto edge_shadow_schema, - ensure_edge_shadow_schema(query_state, traverse->edge_type())); - ARROW_ASSIGN_OR_RAISE( - auto _edge_schema_name, - query_state.register_schema(SchemaRef::parse( - traverse->edge_alias().value() + ":" + edge_shadow_schema))); - (void)_edge_schema_name; - ARROW_RETURN_NOT_OK(query_state.register_edge_alias( - traverse->edge_alias().value(), traverse->edge_type())); + auto _edge_name, + query_state.register_edge_alias(traverse->edge_alias().value(), + traverse->edge_type())); + (void)_edge_name; } traverse->mutable_source().set_tag(compute_tag(traverse->source())); @@ -837,15 +767,16 @@ arrow::Status prepare_query(Query& query, QueryState& query_state) { } // Phase 3: Resolve all ComparisonExpr field references. - // query_state.aliases() already maps edge aliases to their shadow schema - // names (e.g. "e" -> "__edge__WORKS_AT") from the register_schema call in - // Phase 2, so no override is needed here. - const auto& where_aliases = query_state.aliases(); + // Use get_schema_for_alias which dispatches to SchemaRegistry (nodes) + // or EdgeStore (edges) based on AliasKind. + auto schema_resolver = [&query_state](const std::string& variable) + -> arrow::Result> { + return query_state.get_schema_for_alias(variable); + }; for (const auto& clause : query.clauses()) { if (clause->type() == Clause::Type::WHERE) { auto where_expr = std::dynamic_pointer_cast(clause); - auto res = where_expr->resolve_field_ref( - where_aliases, query_state.schema_registry().get()); + auto res = where_expr->resolve_field_ref(schema_resolver); if (!res.ok()) { return res.status(); } diff --git a/src/query/query.cpp b/src/query/query.cpp index 8a52eaf..9f3e07a 100644 --- a/src/query/query.cpp +++ b/src/query/query.cpp @@ -384,15 +384,13 @@ std::set ComparisonExpr::get_all_variables() const { * * This method is idempotent: calling it again after resolution is a no-op. * - * @param aliases Variable -> schema-name map (includes both node - * and edge shadow schemas). - * @param schema_registry Registry to look up schemas by name. + * @param schema_resolver Maps a variable name to its Schema object. * @return true on success, or a KeyError if the variable, schema, or field * cannot be found. */ arrow::Result ComparisonExpr::resolve_field_ref( - const std::unordered_map& aliases, - const SchemaRegistry* schema_registry) { + const std::function>( + const std::string&)>& schema_resolver) { if (field_ref_.is_resolved()) { return true; } @@ -400,25 +398,13 @@ arrow::Result ComparisonExpr::resolve_field_ref( const std::string& variable = field_ref_.variable(); const std::string& field_name = field_ref_.field_name(); - auto it = aliases.find(variable); - if (it == aliases.end()) { - return arrow::Status::KeyError("Unknown variable '", variable, - "' in field '", field_ref_.to_string(), "'"); - } - - const std::string& schema_name = it->second; + ARROW_ASSIGN_OR_RAISE(auto schema, schema_resolver(variable)); - auto schema_result = schema_registry->get(schema_name); - if (!schema_result.ok()) { - return arrow::Status::KeyError("Schema '", schema_name, - "' not found for variable '", variable, "'"); - } - - auto schema = schema_result.ValueOrDie(); auto field = schema->get_field(field_name); if (!field) { return arrow::Status::KeyError("Field '", field_name, - "' not found in schema '", schema_name, "'"); + "' not found in schema for variable '", + variable, "'"); } field_ref_.resolve(field); @@ -434,16 +420,15 @@ void LogicalExpr::set_inlined(bool inlined) { } arrow::Result LogicalExpr::resolve_field_ref( - const std::unordered_map& aliases, - const SchemaRegistry* schema_registry) { + const std::function>( + const std::string&)>& schema_resolver) { if (left_) { - if (const auto res = left_->resolve_field_ref(aliases, schema_registry); - !res.ok()) { + if (const auto res = left_->resolve_field_ref(schema_resolver); !res.ok()) { return res.status(); } } if (right_) { - if (const auto res = right_->resolve_field_ref(aliases, schema_registry); + if (const auto res = right_->resolve_field_ref(schema_resolver); !res.ok()) { return res.status(); } diff --git a/src/query/row.cpp b/src/query/row.cpp index 4fedff3..d18da0a 100644 --- a/src/query/row.cpp +++ b/src/query/row.cpp @@ -293,9 +293,8 @@ void Row::set_cell_from_edge( if (!field) continue; const auto& name = field->name(); const bool structural = - (name == field_names::kId || name == field_names::kEdgeId || - name == field_names::kSourceId || name == field_names::kTargetId || - name == field_names::kCreatedTs); + (name == field_names::kId || name == field_names::kSourceId || + name == field_names::kTargetId || name == field_names::kCreatedTs); if (!structural && edge_schema) { auto real_field = edge_schema->get_field(name); if (!real_field) continue; diff --git a/tests/update_query_join_test.cpp b/tests/update_query_join_test.cpp index a86291e..933c6c9 100644 --- a/tests/update_query_join_test.cpp +++ b/tests/update_query_join_test.cpp @@ -244,7 +244,7 @@ TEST_F(UpdateJoinCrossSchemaTest, UpdateEdgeFieldByMatchAlias) { EXPECT_EQ(vals[0], 2025); } -TEST_F(UpdateJoinCrossSchemaTest, SelectEdgeAliasExpandsAllEdgeFields) { +TEST_F(UpdateJoinCrossSchemaTest, SelectEdgeAliasReturnsOnlyUserDefinedFields) { auto query = Query::from("u:User") .traverse("u", "WORKS_AT", "c:Company", TraverseType::Inner, std::optional{"e"}) @@ -254,31 +254,27 @@ TEST_F(UpdateJoinCrossSchemaTest, SelectEdgeAliasExpandsAllEdgeFields) { ASSERT_TRUE(result.ok()) << result.status().ToString(); auto table = result.ValueOrDie()->table(); - ASSERT_NE(table->GetColumnByName("e._edge_id"), nullptr); - ASSERT_NE(table->GetColumnByName("e.source_id"), nullptr); - ASSERT_NE(table->GetColumnByName("e.target_id"), nullptr); - ASSERT_NE(table->GetColumnByName("e.created_ts"), nullptr); + // Only user-defined edge fields should be present ASSERT_NE(table->GetColumnByName("e.since"), nullptr); ASSERT_NE(table->GetColumnByName("e.role"), nullptr); - auto edge_ids = get_column_values(table, "e._edge_id").ValueOrDie(); - auto src_ids = get_column_values(table, "e.source_id").ValueOrDie(); - auto dst_ids = get_column_values(table, "e.target_id").ValueOrDie(); + // System fields must NOT appear in query output + EXPECT_EQ(table->GetColumnByName("e._edge_id"), nullptr); + EXPECT_EQ(table->GetColumnByName("e._source_id"), nullptr); + EXPECT_EQ(table->GetColumnByName("e._target_id"), nullptr); + EXPECT_EQ(table->GetColumnByName("e._created_ts"), nullptr); + EXPECT_EQ(table->GetColumnByName("e.source_id"), nullptr); + EXPECT_EQ(table->GetColumnByName("e.target_id"), nullptr); + EXPECT_EQ(table->GetColumnByName("e.created_ts"), nullptr); + auto since = get_column_values(table, "e.since").ValueOrDie(); auto role = get_column_values(table, "e.role").ValueOrDie(); - ASSERT_EQ(edge_ids.size(), 2u); - ASSERT_EQ(src_ids.size(), 2u); - ASSERT_EQ(dst_ids.size(), 2u); ASSERT_EQ(since.size(), 2u); ASSERT_EQ(role.size(), 2u); - std::set src_set(src_ids.begin(), src_ids.end()); - std::set dst_set(dst_ids.begin(), dst_ids.end()); std::set since_set(since.begin(), since.end()); std::set role_set(role.begin(), role.end()); - EXPECT_EQ(src_set, (std::set{0, 1})); - EXPECT_EQ(dst_set, (std::set{0})); EXPECT_EQ(since_set, (std::set{2020, 2021})); EXPECT_EQ(role_set, (std::set{"eng", "pm"})); } @@ -303,6 +299,18 @@ TEST_F(UpdateJoinCrossSchemaTest, TraversalWithNoMatchUpdatesNothing) { EXPECT_EQ(get_field("Company", 0, "size"), 0); } +TEST_F(UpdateJoinCrossSchemaTest, DuplicateAliasForNodeAndEdgeFails) { + // "u" is already used as a node alias (u:User); reusing it as an edge alias + // must fail during query preparation. + auto query = Query::from("u:User") + .traverse("u", "WORKS_AT", "c:Company", TraverseType::Inner, + std::optional{"u"}) + .build(); + auto result = db_->query(query); + EXPECT_FALSE(result.ok()); + EXPECT_TRUE(result.status().IsInvalid()) << result.status().ToString(); +} + // ========================================================================= // Fixture: same-schema graph (User --FRIEND--> User) // =========================================================================