diff --git a/robosystems_client/extensions/__init__.py b/robosystems_client/extensions/__init__.py index edf3281..c5ff74d 100644 --- a/robosystems_client/extensions/__init__.py +++ b/robosystems_client/extensions/__init__.py @@ -28,12 +28,22 @@ OperationProgress, OperationResult, ) -from .table_ingest_client import ( - TableIngestClient, - UploadOptions, - IngestOptions, - UploadResult, +from .file_client import ( + FileClient, + FileUploadOptions, + FileUploadResult, + FileInfo, +) +from .materialization_client import ( + MaterializationClient, + MaterializationOptions, + MaterializationResult, + MaterializationStatus, +) +from .table_client import ( + TableClient, TableInfo, + QueryResult as TableQueryResult, ) from .graph_client import ( GraphClient, @@ -177,12 +187,20 @@ "OperationStatus", "OperationProgress", "OperationResult", - # Table Ingest Client - "TableIngestClient", - "UploadOptions", - "IngestOptions", - "UploadResult", + # File Client + "FileClient", + "FileUploadOptions", + "FileUploadResult", + "FileInfo", + # Materialization Client + "MaterializationClient", + "MaterializationOptions", + "MaterializationResult", + "MaterializationStatus", + # Table Client + "TableClient", "TableInfo", + "TableQueryResult", # Graph Client "GraphClient", "GraphMetadata", diff --git a/robosystems_client/extensions/extensions.py b/robosystems_client/extensions/extensions.py index a048af0..f1d031c 100644 --- a/robosystems_client/extensions/extensions.py +++ b/robosystems_client/extensions/extensions.py @@ -9,7 +9,9 @@ from .query_client import QueryClient from .agent_client import AgentClient from .operation_client import OperationClient -from .table_ingest_client import TableIngestClient +from .file_client import FileClient +from .materialization_client import MaterializationClient +from .table_client import TableClient from .graph_client import GraphClient from .sse_client import SSEClient @@ -61,7 +63,9 @@ def __init__(self, config: RoboSystemsExtensionConfig = None): self.query = QueryClient(self.config) self.agent = AgentClient(self.config) self.operations = OperationClient(self.config) - self.tables = TableIngestClient(self.config) + self.files = FileClient(self.config) + self.materialization = MaterializationClient(self.config) + self.tables = TableClient(self.config) self.graphs = GraphClient(self.config) def monitor_operation( @@ -92,7 +96,12 @@ def close(self): self.query.close() self.agent.close() self.operations.close_all() - self.tables.close() + if hasattr(self.files, "close"): + self.files.close() + if hasattr(self.materialization, "close"): + self.materialization.close() + if hasattr(self.tables, "close"): + self.tables.close() self.graphs.close() # Convenience methods that delegate to the appropriate clients diff --git a/robosystems_client/extensions/file_client.py b/robosystems_client/extensions/file_client.py new file mode 100644 index 0000000..6c01105 --- /dev/null +++ b/robosystems_client/extensions/file_client.py @@ -0,0 +1,380 @@ +"""File Client for RoboSystems API + +Manages file operations as first-class resources with multi-layer status tracking. +Files are independent entities with their own lifecycle (S3 → DuckDB → Graph). +""" + +from dataclasses import dataclass +from io import BytesIO +from pathlib import Path +from typing import Dict, Any, Optional, Callable, Union, BinaryIO +import logging +import httpx + +from ..api.files.create_file_upload import ( + sync_detailed as create_file_upload, +) +from ..api.files.update_file import ( + sync_detailed as update_file, +) +from ..api.files.list_files import ( + sync_detailed as list_files, +) +from ..api.files.get_file import ( + sync_detailed as get_file, +) +from ..api.files.delete_file import ( + sync_detailed as delete_file, +) +from ..models.file_upload_request import FileUploadRequest +from ..models.file_status_update import FileStatusUpdate + +logger = logging.getLogger(__name__) + + +@dataclass +class FileUploadOptions: + """Options for file upload operations""" + + on_progress: Optional[Callable[[str], None]] = None + fix_localstack_url: bool = True + ingest_to_graph: bool = False + + +@dataclass +class FileUploadResult: + """Result from file upload operation""" + + file_id: str + file_size: int + row_count: int + table_name: str + file_name: str + success: bool = True + error: Optional[str] = None + + +@dataclass +class FileInfo: + """Information about a file""" + + file_id: str + file_name: str + file_format: str + size_bytes: int + row_count: Optional[int] + upload_status: str + table_name: str + created_at: Optional[str] + uploaded_at: Optional[str] + layers: Optional[Dict[str, Any]] = None + + +class FileClient: + """Client for managing files as first-class resources""" + + def __init__(self, config: Dict[str, Any]): + self.config = config + self.base_url = config["base_url"] + self.headers = config.get("headers", {}) + self.token = config.get("token") + self._http_client = httpx.Client(timeout=120.0) + + def upload( + self, + graph_id: str, + table_name: str, + file_or_buffer: Union[Path, str, BytesIO, BinaryIO], + options: Optional[FileUploadOptions] = None, + ) -> FileUploadResult: + """ + Upload a file to a table. + + This handles the complete 3-step upload process: + 1. Get presigned upload URL + 2. Upload file to S3 + 3. Mark file as 'uploaded' (triggers DuckDB staging) + + Args: + graph_id: Graph database identifier + table_name: Table to associate file with + file_or_buffer: File path, Path object, BytesIO, or file-like object + options: Upload options (progress callback, LocalStack URL fix, auto-ingest) + + Returns: + FileUploadResult with file metadata and status + """ + options = options or FileUploadOptions() + + try: + # Determine file name and read content + if isinstance(file_or_buffer, (str, Path)): + file_path = Path(file_or_buffer) + file_name = file_path.name + with open(file_path, "rb") as f: + file_content = f.read() + elif isinstance(file_or_buffer, BytesIO): + file_name = "data.parquet" + file_content = file_or_buffer.getvalue() + elif hasattr(file_or_buffer, "read"): + file_name = getattr(file_or_buffer, "name", "data.parquet") + file_content = file_or_buffer.read() + else: + raise ValueError(f"Unsupported file type: {type(file_or_buffer)}") + + # Step 1: Get presigned upload URL + if options.on_progress: + options.on_progress( + f"Getting upload URL for {file_name} → table '{table_name}'..." + ) + + upload_request = FileUploadRequest( + file_name=file_name, + content_type="application/x-parquet", + table_name=table_name, + ) + + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + "body": upload_request, + } + + response = create_file_upload(**kwargs) + + if response.status_code != 200 or not response.parsed: + error_msg = f"Failed to get upload URL: {response.status_code}" + return FileUploadResult( + file_id="", + file_size=0, + row_count=0, + table_name=table_name, + file_name=file_name, + success=False, + error=error_msg, + ) + + upload_data = response.parsed + upload_url = upload_data.upload_url + file_id = upload_data.file_id + + # Fix LocalStack URL if needed + if options.fix_localstack_url and "localstack:4566" in upload_url: + upload_url = upload_url.replace("localstack:4566", "localhost:4566") + + # Step 2: Upload file to S3 + if options.on_progress: + options.on_progress(f"Uploading {file_name} to S3...") + + s3_response = self._http_client.put( + upload_url, + content=file_content, + headers={"Content-Type": "application/x-parquet"}, + ) + + if s3_response.status_code not in [200, 204]: + return FileUploadResult( + file_id=file_id, + file_size=len(file_content), + row_count=0, + table_name=table_name, + file_name=file_name, + success=False, + error=f"S3 upload failed: {s3_response.status_code}", + ) + + # Step 3: Mark file as uploaded + if options.on_progress: + options.on_progress(f"Marking {file_name} as uploaded...") + + status_update = FileStatusUpdate( + status="uploaded", + ingest_to_graph=options.ingest_to_graph, + ) + + update_kwargs = { + "graph_id": graph_id, + "file_id": file_id, + "client": self.config.get("client"), + "body": status_update, + } + + update_response = update_file(**update_kwargs) + + if update_response.status_code != 200 or not update_response.parsed: + return FileUploadResult( + file_id=file_id, + file_size=len(file_content), + row_count=0, + table_name=table_name, + file_name=file_name, + success=False, + error="Failed to complete file upload", + ) + + # Extract metadata from response + response_data = update_response.parsed + actual_file_size = getattr(response_data, "file_size_bytes", len(file_content)) + actual_row_count = getattr(response_data, "row_count", 0) + + if options.on_progress: + options.on_progress( + f"✅ Uploaded {file_name} ({actual_file_size:,} bytes, {actual_row_count:,} rows)" + ) + + return FileUploadResult( + file_id=file_id, + file_size=actual_file_size, + row_count=actual_row_count, + table_name=table_name, + file_name=file_name, + success=True, + ) + + except Exception as e: + logger.error(f"File upload failed: {e}") + return FileUploadResult( + file_id="", + file_size=0, + row_count=0, + table_name=table_name, + file_name=getattr(file_or_buffer, "name", "unknown"), + success=False, + error=str(e), + ) + + def list( + self, + graph_id: str, + table_name: Optional[str] = None, + status: Optional[str] = None, + ) -> list[FileInfo]: + """ + List files in a graph with optional filtering. + + Args: + graph_id: Graph database identifier + table_name: Optional table name filter + status: Optional upload status filter (uploaded, pending, etc.) + + Returns: + List of FileInfo objects + """ + try: + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + } + + if table_name: + kwargs["table_name"] = table_name + if status: + kwargs["status"] = status + + response = list_files(**kwargs) + + if response.status_code != 200 or not response.parsed: + logger.error(f"Failed to list files: {response.status_code}") + return [] + + files_data = response.parsed + files = getattr(files_data, "files", []) + + return [ + FileInfo( + file_id=f.file_id, + file_name=f.file_name, + file_format=f.file_format, + size_bytes=f.size_bytes or 0, + row_count=f.row_count, + upload_status=f.upload_status, + table_name=getattr(f, "table_name", ""), + created_at=f.created_at, + uploaded_at=f.uploaded_at, + ) + for f in files + ] + + except Exception as e: + logger.error(f"Failed to list files: {e}") + return [] + + def get(self, graph_id: str, file_id: str) -> Optional[FileInfo]: + """ + Get detailed information about a specific file. + + Args: + graph_id: Graph database identifier + file_id: File ID + + Returns: + FileInfo with multi-layer status tracking, or None if not found + """ + try: + kwargs = { + "graph_id": graph_id, + "file_id": file_id, + "client": self.config.get("client"), + } + + response = get_file(**kwargs) + + if response.status_code != 200 or not response.parsed: + logger.error(f"Failed to get file {file_id}: {response.status_code}") + return None + + file_data = response.parsed + + return FileInfo( + file_id=file_data.file_id, + file_name=file_data.file_name, + file_format=file_data.file_format, + size_bytes=file_data.size_bytes or 0, + row_count=file_data.row_count, + upload_status=file_data.upload_status, + table_name=file_data.table_name or "", + created_at=file_data.created_at, + uploaded_at=file_data.uploaded_at, + layers=getattr(file_data, "layers", None), + ) + + except Exception as e: + logger.error(f"Failed to get file {file_id}: {e}") + return None + + def delete(self, graph_id: str, file_id: str, cascade: bool = False) -> bool: + """ + Delete a file from all layers. + + Args: + graph_id: Graph database identifier + file_id: File ID to delete + cascade: If True, delete from all layers including DuckDB and graph + + Returns: + True if deletion succeeded, False otherwise + """ + try: + kwargs = { + "graph_id": graph_id, + "file_id": file_id, + "client": self.config.get("client"), + "cascade": cascade, + } + + response = delete_file(**kwargs) + + if response.status_code not in [200, 204]: + logger.error(f"Failed to delete file {file_id}: {response.status_code}") + return False + + return True + + except Exception as e: + logger.error(f"Failed to delete file {file_id}: {e}") + return False + + def __del__(self): + """Cleanup HTTP client on deletion""" + if hasattr(self, "_http_client"): + self._http_client.close() diff --git a/robosystems_client/extensions/materialization_client.py b/robosystems_client/extensions/materialization_client.py new file mode 100644 index 0000000..2ddbbbe --- /dev/null +++ b/robosystems_client/extensions/materialization_client.py @@ -0,0 +1,211 @@ +"""Materialization Client for RoboSystems API + +Manages graph materialization from DuckDB staging tables. +Treats the graph database as a materialized view of the mutable DuckDB data lake. +""" + +from dataclasses import dataclass +from typing import Dict, Any, Optional, Callable +import logging + +from ..api.materialization.materialize_graph import ( + sync_detailed as materialize_graph, +) +from ..api.materialization.get_materialization_status import ( + sync_detailed as get_materialization_status, +) +from ..models.materialize_request import MaterializeRequest + +logger = logging.getLogger(__name__) + + +@dataclass +class MaterializationOptions: + """Options for graph materialization operations""" + + ignore_errors: bool = True + rebuild: bool = False + force: bool = False + on_progress: Optional[Callable[[str], None]] = None + + +@dataclass +class MaterializationResult: + """Result from materialization operation""" + + status: str + was_stale: bool + stale_reason: Optional[str] + tables_materialized: list[str] + total_rows: int + execution_time_ms: float + message: str + success: bool = True + error: Optional[str] = None + + +@dataclass +class MaterializationStatus: + """Status information about graph materialization""" + + graph_id: str + is_stale: bool + stale_reason: Optional[str] + stale_since: Optional[str] + last_materialized_at: Optional[str] + materialization_count: int + hours_since_materialization: Optional[float] + message: str + + +class MaterializationClient: + """Client for managing graph materialization operations""" + + def __init__(self, config: Dict[str, Any]): + self.config = config + self.base_url = config["base_url"] + self.headers = config.get("headers", {}) + self.token = config.get("token") + + def materialize( + self, + graph_id: str, + options: Optional[MaterializationOptions] = None, + ) -> MaterializationResult: + """ + Materialize graph from DuckDB staging tables. + + Rebuilds the complete graph database from the current state of DuckDB + staging tables. Automatically discovers all tables, materializes them in + the correct order (nodes before relationships), and clears the staleness flag. + + Args: + graph_id: Graph database identifier + options: Materialization options (ignore_errors, rebuild, force) + + Returns: + MaterializationResult with detailed execution information + + When to use: + - After batch uploads (files uploaded with ingest_to_graph=false) + - After cascade file deletions (graph marked stale) + - Periodic full refresh to ensure consistency + - Recovery from partial materialization failures + """ + options = options or MaterializationOptions() + + try: + if options.on_progress: + options.on_progress("Starting graph materialization...") + + request = MaterializeRequest( + ignore_errors=options.ignore_errors, + rebuild=options.rebuild, + force=options.force, + ) + + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + "body": request, + } + + response = materialize_graph(**kwargs) + + if response.status_code != 200 or not response.parsed: + error_msg = f"Materialization failed: {response.status_code}" + if hasattr(response, "content"): + try: + import json + + error_data = json.loads(response.content) + error_msg = error_data.get("detail", error_msg) + except Exception: + pass + + return MaterializationResult( + status="failed", + was_stale=False, + stale_reason=None, + tables_materialized=[], + total_rows=0, + execution_time_ms=0, + message=error_msg, + success=False, + error=error_msg, + ) + + result_data = response.parsed + + if options.on_progress: + options.on_progress( + f"✅ Materialization complete: {len(result_data.tables_materialized)} tables, " + f"{result_data.total_rows:,} rows in {result_data.execution_time_ms:.2f}ms" + ) + + return MaterializationResult( + status=result_data.status, + was_stale=result_data.was_stale, + stale_reason=result_data.stale_reason, + tables_materialized=result_data.tables_materialized, + total_rows=result_data.total_rows, + execution_time_ms=result_data.execution_time_ms, + message=result_data.message, + success=True, + ) + + except Exception as e: + logger.error(f"Materialization failed: {e}") + return MaterializationResult( + status="failed", + was_stale=False, + stale_reason=None, + tables_materialized=[], + total_rows=0, + execution_time_ms=0, + message=str(e), + success=False, + error=str(e), + ) + + def status(self, graph_id: str) -> Optional[MaterializationStatus]: + """ + Get current materialization status for the graph. + + Shows whether the graph is stale (DuckDB has changes not yet in graph database), + when it was last materialized, and how long since last materialization. + + Args: + graph_id: Graph database identifier + + Returns: + MaterializationStatus with staleness and timing information + """ + try: + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + } + + response = get_materialization_status(**kwargs) + + if response.status_code != 200 or not response.parsed: + logger.error(f"Failed to get materialization status: {response.status_code}") + return None + + status_data = response.parsed + + return MaterializationStatus( + graph_id=status_data.graph_id, + is_stale=status_data.is_stale, + stale_reason=status_data.stale_reason, + stale_since=status_data.stale_since, + last_materialized_at=status_data.last_materialized_at, + materialization_count=status_data.materialization_count, + hours_since_materialization=status_data.hours_since_materialization, + message=status_data.message, + ) + + except Exception as e: + logger.error(f"Failed to get materialization status: {e}") + return None diff --git a/robosystems_client/extensions/table_client.py b/robosystems_client/extensions/table_client.py new file mode 100644 index 0000000..00f40f1 --- /dev/null +++ b/robosystems_client/extensions/table_client.py @@ -0,0 +1,161 @@ +"""Table Client for RoboSystems API + +Manages DuckDB staging table operations. +Tables provide SQL-queryable staging layer before graph materialization. +""" + +from dataclasses import dataclass +from typing import Dict, Any, Optional +import logging + +from ..api.tables.list_tables import ( + sync_detailed as list_tables, +) +from ..api.tables.query_tables import ( + sync_detailed as query_tables, +) +from ..models.table_query_request import TableQueryRequest + +logger = logging.getLogger(__name__) + + +@dataclass +class TableInfo: + """Information about a DuckDB staging table""" + + table_name: str + table_type: str + row_count: int + file_count: int + total_size_bytes: int + + +@dataclass +class QueryResult: + """Result from SQL query execution""" + + columns: list[str] + rows: list[list[Any]] + row_count: int + execution_time_ms: float + success: bool = True + error: Optional[str] = None + + +class TableClient: + """Client for managing DuckDB staging tables""" + + def __init__(self, config: Dict[str, Any]): + self.config = config + self.base_url = config["base_url"] + self.headers = config.get("headers", {}) + self.token = config.get("token") + + def list(self, graph_id: str) -> list[TableInfo]: + """ + List all DuckDB staging tables in a graph. + + Args: + graph_id: Graph database identifier + + Returns: + List of TableInfo objects with metadata + """ + try: + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + } + + response = list_tables(**kwargs) + + if response.status_code != 200 or not response.parsed: + logger.error(f"Failed to list tables: {response.status_code}") + return [] + + table_data = response.parsed + tables = getattr(table_data, "tables", []) + + return [ + TableInfo( + table_name=t.table_name, + table_type=t.table_type, + row_count=t.row_count, + file_count=t.file_count or 0, + total_size_bytes=t.total_size_bytes or 0, + ) + for t in tables + ] + + except Exception as e: + logger.error(f"Failed to list tables: {e}") + return [] + + def query( + self, graph_id: str, sql_query: str, limit: Optional[int] = None + ) -> QueryResult: + """ + Execute SQL query against DuckDB staging tables. + + Args: + graph_id: Graph database identifier + sql_query: SQL query to execute + limit: Optional row limit + + Returns: + QueryResult with columns and rows + + Example: + >>> result = client.tables.query( + ... graph_id, + ... "SELECT * FROM Entity WHERE entity_type = 'CORPORATION'" + ... ) + >>> for row in result.rows: + ... print(row) + """ + try: + final_query = sql_query + if limit is not None: + final_query = f"{sql_query.rstrip(';')} LIMIT {limit}" + + request = TableQueryRequest(sql=final_query) + + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + "body": request, + } + + response = query_tables(**kwargs) + + if response.status_code != 200 or not response.parsed: + error_msg = f"Query failed: {response.status_code}" + return QueryResult( + columns=[], + rows=[], + row_count=0, + execution_time_ms=0, + success=False, + error=error_msg, + ) + + result_data = response.parsed + + return QueryResult( + columns=result_data.columns, + rows=result_data.rows, + row_count=len(result_data.rows), + execution_time_ms=getattr(result_data, "execution_time_ms", 0), + success=True, + ) + + except Exception as e: + logger.error(f"Query failed: {e}") + return QueryResult( + columns=[], + rows=[], + row_count=0, + execution_time_ms=0, + success=False, + error=str(e), + ) diff --git a/robosystems_client/extensions/table_ingest_client.py b/robosystems_client/extensions/table_ingest_client.py deleted file mode 100644 index 8a14757..0000000 --- a/robosystems_client/extensions/table_ingest_client.py +++ /dev/null @@ -1,463 +0,0 @@ -"""Table Ingest Client for RoboSystems API - -Simplifies uploading Parquet files to staging tables and ingesting them into graphs. -""" - -from dataclasses import dataclass -from io import BytesIO -from pathlib import Path -from typing import Dict, Any, Optional, Callable, List, Union, BinaryIO -import json -import logging -import httpx - -from ..api.files.create_file_upload import ( - sync_detailed as create_file_upload, -) -from ..api.files.update_file import ( - sync_detailed as update_file, -) -from ..api.tables.list_tables import ( - sync_detailed as list_tables, -) -from ..api.materialization.materialize_graph import ( - sync_detailed as materialize_graph, -) -from ..models.file_upload_request import FileUploadRequest -from ..models.file_status_update import FileStatusUpdate -from ..models.materialize_request import MaterializeRequest - -logger = logging.getLogger(__name__) - - -@dataclass -class UploadOptions: - """Options for file upload operations""" - - on_progress: Optional[Callable[[str], None]] = None - fix_localstack_url: bool = True # Auto-fix LocalStack URLs for localhost - file_name: Optional[str] = None # Override file name (useful for buffer uploads) - - -@dataclass -class IngestOptions: - """Options for table ingestion operations""" - - ignore_errors: bool = True - rebuild: bool = False - on_progress: Optional[Callable[[str], None]] = None - - -@dataclass -class UploadResult: - """Result from file upload operation""" - - file_id: str - file_size: int - row_count: int - table_name: str - file_name: str - success: bool = True - error: Optional[str] = None - - -@dataclass -class TableInfo: - """Information about a staging table""" - - table_name: str - row_count: int - file_count: int - total_size_bytes: int - - -class TableIngestClient: - """Enhanced table ingest client with simplified upload workflow""" - - def __init__(self, config: Dict[str, Any]): - self.config = config - self.base_url = config["base_url"] - self.headers = config.get("headers", {}) - self.token = config.get("token") - # Create httpx client for S3 uploads - self._http_client = httpx.Client(timeout=120.0) - - def upload_parquet_file( - self, - graph_id: str, - table_name: str, - file_or_buffer: Union[Path, str, BytesIO, BinaryIO], - options: Optional[UploadOptions] = None, - ) -> UploadResult: - """ - Upload a Parquet file to a staging table. - - This method handles the complete 3-step upload process: - 1. Get presigned upload URL - 2. Upload file to S3 - 3. Mark file as 'uploaded' (backend validates, calculates size/row count) - - Args: - graph_id: The graph ID - table_name: Name of the staging table - file_or_buffer: Path to the Parquet file or BytesIO/BinaryIO buffer - options: Upload options - - Returns: - UploadResult with upload details (size/row count calculated by backend) - """ - if options is None: - options = UploadOptions() - - # Auto-detect if this is a file path or buffer - is_buffer = isinstance(file_or_buffer, (BytesIO, BinaryIO)) or hasattr( - file_or_buffer, "read" - ) - - # Initialize file_path for type checking - file_path: Optional[Path] = None - - if is_buffer: - # Handle buffer upload - file_name = options.file_name or "data.parquet" - else: - # Handle file path upload - file_path = Path(file_or_buffer) - file_name = file_path.name - if not file_path.exists(): - return UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error=f"File not found: {file_path}", - ) - - try: - # Import client here to avoid circular imports - from ..client import AuthenticatedClient - - # Create authenticated client with X-API-Key - # The token is extracted from X-API-Key header in extensions.py - if not self.token: - return UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error="No API key provided. Set X-API-Key in headers.", - ) - - client = AuthenticatedClient( - base_url=self.base_url, - token=self.token, - prefix="", # No prefix for X-API-Key - auth_header_name="X-API-Key", # Use X-API-Key header instead of Authorization - headers=self.headers, - ) - - # Step 1: Get presigned upload URL - if options.on_progress: - options.on_progress( - f"Getting upload URL for {file_name} -> table '{table_name}'..." - ) - - upload_request = FileUploadRequest( - file_name=file_name, content_type="application/x-parquet", table_name=table_name - ) - - kwargs = { - "graph_id": graph_id, - "client": client, - "body": upload_request, - } - - response = create_file_upload(**kwargs) - - if not response.parsed: - error_msg = f"Failed to get upload URL (status: {response.status_code})" - if hasattr(response, "content"): - try: - error_detail = json.loads(response.content) - error_msg = f"{error_msg}: {error_detail}" - except (json.JSONDecodeError, ValueError): - error_msg = f"{error_msg}: {response.content[:200]}" - - return UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error=error_msg, - ) - - upload_url = response.parsed.upload_url - file_id = response.parsed.file_id - - # Fix LocalStack URL if needed - if options.fix_localstack_url and "localstack:4566" in upload_url: - upload_url = upload_url.replace("localstack:4566", "localhost:4566") - logger.debug("Fixed LocalStack URL for localhost access") - - # Step 2: Upload file to S3 - if options.on_progress: - options.on_progress(f"Uploading {file_name} to S3...") - - # Read file content - handle both paths and buffers - if is_buffer: - # Read from buffer - if hasattr(file_or_buffer, "getvalue"): - file_content = file_or_buffer.getvalue() - else: - # BinaryIO or file-like object - file_or_buffer.seek(0) - file_content = file_or_buffer.read() - else: - # Read from file path - if file_path is None: - raise ValueError("file_path should not be None when not using buffer") - with open(file_path, "rb") as f: - file_content = f.read() - - s3_response = self._http_client.put( - upload_url, - content=file_content, - headers={"Content-Type": "application/x-parquet"}, - ) - s3_response.raise_for_status() - - # Step 3: Mark file as uploaded (backend validates and calculates size/row count) - if options.on_progress: - options.on_progress(f"Marking {file_name} as uploaded...") - - status_update = FileStatusUpdate(status="uploaded") - - kwargs = { - "graph_id": graph_id, - "file_id": file_id, - "client": client, - "body": status_update, - } - - update_response = update_file(**kwargs) - - if not update_response.parsed: - logger.error( - f"No parsed response from update_file. Status code: {update_response.status_code}" - ) - return UploadResult( - file_id=file_id, - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error="Failed to complete file upload", - ) - - response_data = update_response.parsed - - if isinstance(response_data, dict): - file_size = response_data.get("file_size_bytes", 0) - row_count = response_data.get("row_count", 0) - elif hasattr(response_data, "additional_properties"): - file_size = response_data.additional_properties.get("file_size_bytes", 0) - row_count = response_data.additional_properties.get("row_count", 0) - else: - file_size = getattr(response_data, "file_size_bytes", 0) - row_count = getattr(response_data, "row_count", 0) - - if options.on_progress: - options.on_progress( - f"✅ Uploaded {file_name} ({file_size:,} bytes, {row_count:,} rows)" - ) - - return UploadResult( - file_id=file_id, - file_size=file_size, - row_count=row_count, - table_name=table_name, - file_name=file_name, - success=True, - ) - - except Exception as e: - logger.error(f"Upload failed for {file_name}: {e}") - return UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error=str(e), - ) - - def list_staging_tables(self, graph_id: str) -> List[TableInfo]: - """ - List all staging tables in a graph. - - Args: - graph_id: The graph ID - - Returns: - List of TableInfo objects - """ - try: - from ..client import AuthenticatedClient - - if not self.token: - logger.error("No API key provided") - return [] - - client = AuthenticatedClient( - base_url=self.base_url, - token=self.token, - prefix="", - auth_header_name="X-API-Key", - headers=self.headers, - ) - - kwargs = {"graph_id": graph_id, "client": client} - - response = list_tables(**kwargs) - - if not response.parsed: - logger.error("Failed to list tables") - return [] - - tables = [] - for table_data in response.parsed.tables: - tables.append( - TableInfo( - table_name=table_data.table_name, - row_count=table_data.row_count, - file_count=table_data.file_count, - total_size_bytes=table_data.total_size_bytes, - ) - ) - - return tables - - except Exception as e: - logger.error(f"Failed to list tables: {e}") - return [] - - def ingest_all_tables( - self, graph_id: str, options: Optional[IngestOptions] = None - ) -> Dict[str, Any]: - """ - Materialize the graph from all staging tables. - - This rebuilds the complete graph database from the current state of DuckDB staging tables. - - Args: - graph_id: The graph ID - options: Ingest options - - Returns: - Dictionary with materialization results - """ - if options is None: - options = IngestOptions() - - try: - from ..client import AuthenticatedClient - - if not self.token: - return {"success": False, "error": "No API key provided"} - - client = AuthenticatedClient( - base_url=self.base_url, - token=self.token, - prefix="", - auth_header_name="X-API-Key", - headers=self.headers, - ) - - if options.on_progress: - options.on_progress("Starting table materialization...") - - materialize_request = MaterializeRequest( - ignore_errors=options.ignore_errors, rebuild=options.rebuild, force=True - ) - - kwargs = { - "graph_id": graph_id, - "client": client, - "body": materialize_request, - } - - response = materialize_graph(**kwargs) - - if not response.parsed: - return {"success": False, "error": "Failed to materialize graph"} - - result = { - "success": True, - "operation_id": getattr(response.parsed, "operation_id", None), - "message": getattr(response.parsed, "message", "Materialization started"), - } - - if options.on_progress: - options.on_progress("✅ Graph materialization completed") - - return result - - except Exception as e: - logger.error(f"Failed to materialize graph: {e}") - return {"success": False, "error": str(e)} - - def upload_and_ingest( - self, - graph_id: str, - table_name: str, - file_path: Path, - upload_options: Optional[UploadOptions] = None, - ingest_options: Optional[IngestOptions] = None, - ) -> Dict[str, Any]: - """ - Convenience method to upload a file and immediately ingest it. - - Args: - graph_id: The graph ID - table_name: Name of the staging table - file_path: Path to the Parquet file - upload_options: Upload options - ingest_options: Ingest options - - Returns: - Dictionary with upload and ingest results - """ - # Upload the file - upload_result = self.upload_parquet_file( - graph_id, table_name, file_path, upload_options - ) - - if not upload_result.success: - return { - "success": False, - "upload": upload_result, - "ingest": None, - "error": upload_result.error, - } - - # Ingest the table - ingest_result = self.ingest_all_tables(graph_id, ingest_options) - - return { - "success": upload_result.success and ingest_result.get("success", False), - "upload": upload_result, - "ingest": ingest_result, - } - - def close(self): - """Close HTTP client connections""" - if self._http_client: - self._http_client.close() diff --git a/tests/test_extensions.py b/tests/test_extensions.py index c43ba20..ffbab09 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -4,7 +4,9 @@ from robosystems_client.extensions import ( RoboSystemsExtensions, RoboSystemsExtensionConfig, - TableIngestClient, + FileClient, + MaterializationClient, + TableClient, QueryClient, OperationClient, GraphClient, @@ -22,10 +24,11 @@ def test_extensions_initialization_default(self): assert extensions.config["base_url"] == "http://localhost:8000" assert isinstance(extensions.query, QueryClient) assert isinstance(extensions.operations, OperationClient) - assert isinstance(extensions.tables, TableIngestClient) + assert isinstance(extensions.files, FileClient) + assert isinstance(extensions.materialization, MaterializationClient) + assert isinstance(extensions.tables, TableClient) assert isinstance(extensions.graphs, GraphClient) - # Cleanup extensions.close() def test_extensions_initialization_with_config(self): @@ -103,8 +106,36 @@ def test_operation_client_receives_config(self): extensions.close() - def test_table_ingest_client_receives_config(self): - """Test that TableIngestClient receives proper config.""" + def test_file_client_receives_config(self): + """Test that FileClient receives proper config.""" + config = RoboSystemsExtensionConfig( + base_url="https://api.robosystems.ai", + headers={"X-API-Key": "test-token"}, + ) + + extensions = RoboSystemsExtensions(config) + + assert extensions.files.base_url == "https://api.robosystems.ai" + assert "X-API-Key" in extensions.files.headers + + extensions.close() + + def test_materialization_client_receives_config(self): + """Test that MaterializationClient receives proper config.""" + config = RoboSystemsExtensionConfig( + base_url="https://api.robosystems.ai", + headers={"X-API-Key": "test-token"}, + ) + + extensions = RoboSystemsExtensions(config) + + assert extensions.materialization.base_url == "https://api.robosystems.ai" + assert "X-API-Key" in extensions.materialization.headers + + extensions.close() + + def test_table_client_receives_config(self): + """Test that TableClient receives proper config.""" config = RoboSystemsExtensionConfig( base_url="https://api.robosystems.ai", headers={"X-API-Key": "test-token"}, diff --git a/tests/test_table_ingest_client.py b/tests/test_table_ingest_client.py deleted file mode 100644 index cfae7da..0000000 --- a/tests/test_table_ingest_client.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Unit tests for TableIngestClient.""" - -import pytest -from robosystems_client.extensions.table_ingest_client import ( - TableIngestClient, - UploadOptions, - IngestOptions, - UploadResult, - TableInfo, -) - - -@pytest.mark.unit -class TestTableIngestClient: - """Test suite for TableIngestClient.""" - - def test_client_initialization(self, mock_config): - """Test that client initializes correctly with config.""" - client = TableIngestClient(mock_config) - - assert client.base_url == "http://localhost:8000" - assert client.token == "test-api-key" - assert client.headers == {"X-API-Key": "test-api-key"} - - def test_upload_options_dataclass(self): - """Test UploadOptions dataclass.""" - options = UploadOptions( - on_progress=lambda msg: print(msg), - fix_localstack_url=True, - file_name="test.parquet", - ) - - assert options.fix_localstack_url is True - assert options.file_name == "test.parquet" - assert options.on_progress is not None - - def test_ingest_options_dataclass(self): - """Test IngestOptions dataclass.""" - options = IngestOptions( - ignore_errors=False, rebuild=True, on_progress=lambda msg: print(msg) - ) - - assert options.ignore_errors is False - assert options.rebuild is True - assert options.on_progress is not None - - def test_upload_result_dataclass(self): - """Test UploadResult dataclass.""" - result = UploadResult( - file_id="file-123", - file_size=5000, - row_count=100, - table_name="Entity", - file_name="data.parquet", - success=True, - ) - - assert result.file_id == "file-123" - assert result.file_size == 5000 - assert result.row_count == 100 - assert result.table_name == "Entity" - assert result.success is True - assert result.error is None - - def test_upload_result_with_error(self): - """Test UploadResult with error.""" - result = UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name="Entity", - file_name="data.parquet", - success=False, - error="Upload failed", - ) - - assert result.success is False - assert result.error == "Upload failed" - - def test_table_info_dataclass(self): - """Test TableInfo dataclass.""" - info = TableInfo( - table_name="Entity", row_count=1000, file_count=5, total_size_bytes=50000 - ) - - assert info.table_name == "Entity" - assert info.row_count == 1000 - assert info.file_count == 5 - assert info.total_size_bytes == 50000 - - def test_close_method(self, mock_config): - """Test that close method exists and can be called.""" - client = TableIngestClient(mock_config) - - # Should not raise any exceptions - client.close() - - def test_default_upload_options(self): - """Test default UploadOptions values.""" - options = UploadOptions() - - assert options.on_progress is None - assert options.fix_localstack_url is True - assert options.file_name is None - - def test_default_ingest_options(self): - """Test default IngestOptions values.""" - options = IngestOptions() - - assert options.ignore_errors is True - assert options.rebuild is False - assert options.on_progress is None