Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/arrow/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ arrow::Result<std::shared_ptr<arrow::Table>> create_table_from_nodes(
arrow::Result<std::shared_ptr<arrow::Table>> create_empty_table(
const std::shared_ptr<arrow::Schema>& schema);

/**
* @brief Appends a single ValueRef to an Arrow ArrayBuilder.
*
* Dispatches by ValueType to the correct typed builder (Int32, Int64, etc.).
* Appends null when data is nullptr.
*
* @param value The value to append.
* @param builder The Arrow builder to append into.
* @return OK on success, or an error for unsupported types.
*/
arrow::Status append_value_to_builder(const ValueRef& value,
arrow::ArrayBuilder* builder);

/**
* @brief Appends an ArrayRef's contents to an Arrow ListBuilder.
*
Expand Down
6 changes: 0 additions & 6 deletions include/common/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace tundradb {
/// renames safe.
namespace field_names {
inline constexpr std::string_view kId = "id";
inline constexpr std::string_view kEdgeId = "_edge_id";
inline constexpr std::string_view kSourceId = "source_id";
inline constexpr std::string_view kTargetId = "target_id";
inline constexpr std::string_view kCreatedTs = "created_ts";
Expand All @@ -22,11 +21,6 @@ namespace arena_flags {
inline constexpr uint32_t kMarkedForDeletion = 0x1;
} // namespace arena_flags

/// Prefix for synthetic shadow schemas that wrap edge types.
namespace schema {
inline constexpr std::string_view kEdgeShadowPrefix = "__edge__";
} // namespace schema

/// FNV-1a 32-bit hash parameters (used by compute_tag).
namespace hash {
inline constexpr uint32_t kFnv1aOffsetBasis = 2166136261u;
Expand Down
83 changes: 5 additions & 78 deletions include/common/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,85 +254,12 @@ static arrow::Result<std::shared_ptr<arrow::Table>> create_table(

for (int i = 0; i < schema->num_fields(); i++) {
const auto& field = schema->field(i);
auto field_result = view.get_value_ptr(field); // ✅ Use NodeView!
if (!field_result.ok()) {
ARROW_RETURN_NOT_OK(builders[i]->AppendNull());
} else {
const auto value_ptr = field_result.ValueOrDie();
if (value_ptr == nullptr) {
ARROW_RETURN_NOT_OK(builders[i]->AppendNull());
} else {
switch (field->type()) {
case ValueType::INT32: {
ARROW_RETURN_NOT_OK(
dynamic_cast<arrow::Int32Builder*>(builders[i].get())
->Append(*reinterpret_cast<const int32_t*>(value_ptr)));
break;
}
case ValueType::INT64: {
ARROW_RETURN_NOT_OK(
dynamic_cast<arrow::Int64Builder*>(builders[i].get())
->Append(*reinterpret_cast<const int64_t*>(value_ptr)));
break;
}
case ValueType::FLOAT: {
// return Value{*reinterpret_cast<const double*>(ptr)};
ARROW_RETURN_NOT_OK(
dynamic_cast<arrow::FloatBuilder*>(builders[i].get())
->Append(*reinterpret_cast<const float*>(value_ptr)));
break;
}
case ValueType::DOUBLE: {
ARROW_RETURN_NOT_OK(
dynamic_cast<arrow::DoubleBuilder*>(builders[i].get())
->Append(*reinterpret_cast<const double*>(value_ptr)));
break;
}
case ValueType::BOOL: {
ARROW_RETURN_NOT_OK(
dynamic_cast<arrow::BooleanBuilder*>(builders[i].get())
->Append(*reinterpret_cast<const bool*>(value_ptr)));
break;
}
case ValueType::STRING: {
auto str_ref = *reinterpret_cast<const StringRef*>(value_ptr);

ARROW_RETURN_NOT_OK(
dynamic_cast<arrow::StringBuilder*>(builders[i].get())
->Append(str_ref.to_string()));
break;
}
case ValueType::ARRAY: {
const auto& arr_ref =
*reinterpret_cast<const ArrayRef*>(value_ptr);
auto* list_builder =
dynamic_cast<arrow::ListBuilder*>(builders[i].get());
if (!list_builder) {
return arrow::Status::Invalid(
"Expected ListBuilder for array field: ", field->name());
}
ARROW_RETURN_NOT_OK(
append_array_to_list_builder(arr_ref, list_builder));
break;
}
case ValueType::MAP: {
const auto& map_ref = *reinterpret_cast<const MapRef*>(value_ptr);
auto* map_builder =
dynamic_cast<arrow::MapBuilder*>(builders[i].get());
if (!map_builder) {
return arrow::Status::Invalid(
"Expected MapBuilder for MAP field: ", field->name());
}
ARROW_RETURN_NOT_OK(
append_map_to_map_builder(map_ref, map_builder));
break;
}
default:
return arrow::Status::NotImplemented("Unsupported type: ",
to_string(field->type()));
}
}
auto field_result = view.get_value_ptr(field);
ValueRef vr;
if (field_result.ok() && field_result.ValueOrDie() != nullptr) {
vr = ValueRef(field_result.ValueOrDie(), field->type());
}
ARROW_RETURN_NOT_OK(append_value_to_builder(vr, builders[i].get()));
}

nodes_in_current_chunk++;
Expand Down
2 changes: 0 additions & 2 deletions include/main/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ class Database {
* are skipped. Returns an error if the same alias is bound to two different
* schemas.
*/
static arrow::Result<std::unordered_map<std::string, std::string>>
resolve_alias_map(const Query &query);
};

} // namespace tundradb
146 changes: 82 additions & 64 deletions include/query/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,44 +92,77 @@ class ConnectionPool {
size_t size() const { return next_index_; }
};

/// Distinguishes node aliases from edge aliases in the unified alias map.
enum class AliasKind { Node, Edge };

/// A registered alias entry: the concrete schema/edge-type name and its kind.
struct AliasEntry {
std::string schema_name;
AliasKind kind;
};

/**
* @brief Manages schema resolution and aliases for query execution
*
* Responsibilities:
* - Map aliases (e.g., "u") to schema names (e.g., "User")
* - Map aliases (e.g., "u") to schema names (e.g., "User") or edge types
* - Resolve SchemaRef objects to concrete schema names
* - Validate schema references
* - Validate schema references and detect alias conflicts
*/
class SchemaContext {
private:
std::unordered_map<std::string, std::string> aliases_;
std::shared_ptr<SchemaRegistry> schema_registry_;
std::unordered_map<std::string, AliasEntry> aliases_;

public:
explicit SchemaContext(std::shared_ptr<SchemaRegistry> registry)
: schema_registry_(std::move(registry)) {}
SchemaContext() = default;

/**
* @brief Registers a node or edge alias.
*
* Idempotent when the alias already maps to the same name+kind; returns
* an error if the alias is already bound to a different schema or kind.
*
* @param alias The alias string (e.g. "u", "e").
* @param schema_name The concrete schema name or edge type (e.g. "User",
* "WORKS_AT").
* @param kind Whether this is a Node or Edge alias.
* @return The schema_name on success, or an error on conflict.
*/
arrow::Result<std::string> register_alias(const std::string& alias,
const std::string& schema_name,
AliasKind kind);

/**
* @brief Registers a schema alias (e.g. "u" -> "User").
* @brief Registers a schema alias from a SchemaRef.
*
* @param schema_ref The schema reference containing alias and schema name.
* @return The resolved concrete schema name, or an error if unknown.
* For declarations (alias:Schema): calls register_alias with Node kind.
* For bare references (alias only): delegates to resolve().
*/
arrow::Result<std::string> register_schema(const SchemaRef& schema_ref);

/**
* @brief Resolves a schema reference to its concrete schema name.
* @brief Resolves a schema reference to its entry.
*
* @param schema_ref The reference to resolve.
* @return The concrete schema name, or an error if not registered.
* @return The AliasEntry, or an error if not registered.
*/
arrow::Result<std::string> resolve(const SchemaRef& schema_ref) const;
arrow::Result<AliasEntry> resolve(const SchemaRef& schema_ref) const;

/** @brief Returns the underlying SchemaRegistry. */
std::shared_ptr<SchemaRegistry> registry() const { return schema_registry_; }
/**
* @brief Resolves an alias string to its entry.
*
* @param alias The alias to resolve.
* @return The AliasEntry, or an error if not registered.
*/
arrow::Result<AliasEntry> resolve(const std::string& alias) const;

/** @brief Returns all registered alias->schema mappings. */
const std::unordered_map<std::string, std::string>& get_aliases() const {
/** @brief Returns true if the alias is registered. */
bool contains(const std::string& alias) const {
return aliases_.contains(alias);
}

/** @brief Returns all registered alias entries. */
const std::unordered_map<std::string, AliasEntry>& get_aliases() const {
return aliases_;
}
};
Expand Down Expand Up @@ -319,13 +352,11 @@ class FieldIndexer {
* schema.
*
* @param schema_ref The schema reference (alias used as prefix).
* @param resolved_schema The concrete schema name.
* @param registry The schema registry for field lookup.
* @param schema The resolved Schema whose fields will be indexed.
* @return True if names were computed (false if already done).
*/
arrow::Result<bool> compute_fq_names(const SchemaRef& schema_ref,
const std::string& resolved_schema,
SchemaRegistry* registry);
const Schema& schema);

arrow::Result<bool> compute_fq_names_from_fields(
const std::string& alias,
Expand Down Expand Up @@ -410,7 +441,7 @@ class FieldIndexer {
* - Tables: Arrow table storage
*/
struct QueryState {
SchemaContext schemas; ///< Schema resolution and aliases.
SchemaContext schemas; ///< Alias resolution (unified node + edge).
GraphState graph; ///< Graph topology (IDs, connections).
FieldIndexer fields; ///< Field indexing for row operations.

Expand All @@ -419,53 +450,41 @@ struct QueryState {

SchemaRef from; ///< Source schema from the FROM clause.
std::vector<Traverse> traversals; ///< Traverse clauses in query order.
std::unordered_map<std::string, std::string>
edge_aliases; ///< edge alias -> edge type

std::shared_ptr<NodeManager> node_manager; ///< Node storage.
std::shared_ptr<EdgeStore> edge_store; ///< Edge storage.
std::shared_ptr<SchemaRegistry> schema_registry; ///< Node schema registry.
std::shared_ptr<NodeManager> node_manager; ///< Node storage.
std::shared_ptr<EdgeStore> edge_store; ///< Edge storage.

std::unique_ptr<TemporalContext>
temporal_context; ///< Temporal snapshot (nullptr = current).

/** @brief Constructs a QueryState bound to the given schema registry. */
explicit QueryState(std::shared_ptr<SchemaRegistry> registry);

/** @brief Registers a schema alias. @see SchemaContext::register_schema. */
/** @brief Registers a node alias. @see SchemaContext::register_schema. */
arrow::Result<std::string> register_schema(const SchemaRef& ref) {
return schemas.register_schema(ref);
}

/**
* Records a mapping from an edge alias to its raw edge type name.
*
* This is separate from the schema alias (which maps the alias to the
* shadow schema name like "__edge__WORKS_AT"). The raw edge type is
* needed later by the traversal engine to query EdgeStore by type.
*
* @param alias The edge variable (e.g. "e").
* @param edge_type The raw edge type (e.g. "WORKS_AT").
* @return true, or Invalid if the alias is empty or already bound to a
* different type.
*/
arrow::Result<bool> register_edge_alias(const std::string& alias,
const std::string& edge_type) {
if (alias.empty()) {
return arrow::Status::Invalid("Edge alias cannot be empty");
}
if (auto [it, inserted] = edge_aliases.emplace(alias, edge_type);
!inserted && it->second != edge_type) {
return arrow::Status::Invalid("Edge alias '", alias,
"' is already bound to edge type '",
it->second, "'");
}
return true;
/** @brief Registers an edge alias (alias -> edge_type). */
arrow::Result<std::string> register_edge_alias(const std::string& alias,
const std::string& edge_type) {
return schemas.register_alias(alias, edge_type, AliasKind::Edge);
}

/** @brief Resolves a schema alias to its concrete name. */
/** @brief Resolves an alias to its schema name. */
arrow::Result<std::string> resolve_schema(const SchemaRef& ref) const {
return schemas.resolve(ref);
ARROW_ASSIGN_OR_RAISE(auto entry, schemas.resolve(ref));
return entry.schema_name;
}

/**
* @brief Resolves an alias and fetches the corresponding Schema object.
* Dispatches to SchemaRegistry for nodes, EdgeStore for edges.
*/
arrow::Result<std::shared_ptr<Schema>> get_schema_for_alias(
const std::string& alias) const;

/** @brief Returns the mutable ID set for the given schema. */
llvm::DenseSet<int64_t>& get_ids(const SchemaRef& schema_ref) {
return graph.ids(schema_ref.value());
Expand All @@ -481,28 +500,27 @@ struct QueryState {
return graph.has_outgoing(ref, node_id);
}

/** @brief Computes FQ field names for the given schema reference. */
arrow::Result<bool> compute_fully_qualified_names(
const SchemaRef& ref, const std::string& resolved_schema) {
return fields.compute_fq_names(ref, resolved_schema,
schemas.registry().get());
/** @brief Computes FQ field names for the given alias. */
arrow::Result<bool> compute_fully_qualified_names(const SchemaRef& ref,
const Schema& schema) {
return fields.compute_fq_names(ref, schema);
}

/** @overload Resolves the schema name automatically. */
/** @overload Resolves the schema automatically via alias kind. */
arrow::Result<bool> compute_fully_qualified_names(const SchemaRef& ref);

/** @brief Removes a node from the graph. */
void remove_node(int64_t node_id, const SchemaRef& ref) {
graph.remove_node(node_id, ref);
}

/** @brief Returns the schema registry. */
std::shared_ptr<SchemaRegistry> schema_registry() const {
return schemas.registry();
/** @brief Returns true if the alias is registered. */
bool has_alias(const std::string& alias) const {
return schemas.contains(alias);
}

/** @brief Returns all alias->schema mappings. */
const std::unordered_map<std::string, std::string>& aliases() const {
/** @brief Returns all registered alias entries. */
const std::unordered_map<std::string, AliasEntry>& aliases() const {
return schemas.get_aliases();
}

Expand Down Expand Up @@ -685,7 +703,7 @@ arrow::Result<std::shared_ptr<arrow::Table>> inline_where(
* @param query_state The execution state to populate.
* @return OK on success, or an error.
*/
arrow::Status prepare_query(Query& query, QueryState& query_state);
arrow::Status prepare_query(const Query& query, QueryState& query_state);

} // namespace tundradb

Expand Down
Loading
Loading