From 83d04d78d289a5279555bb45c860ac2767a7c0d2 Mon Sep 17 00:00:00 2001 From: JanusAsmussen Date: Thu, 26 Sep 2024 16:56:35 +0200 Subject: [PATCH 1/7] Set polars concatenation option to avoid issues with empty partitions --- .../distributed_object_store/v3/datastax_astra/astra_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py b/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py index b1e8b60a..485c516e 100644 --- a/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py +++ b/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py @@ -62,7 +62,7 @@ from adapta.storage.distributed_object_store.v3.datastax_astra._models import SimilarityFunction, VectorSearchQuery from adapta.storage.models.filter_expression import Expression, AstraFilterExpression, compile_expression from adapta.utils import chunk_list, rate_limit -from adapta.utils.metaframe import MetaFrame, concat +from adapta.utils.metaframe import MetaFrame, concat, PolarsOptions from adapta.storage.distributed_object_store.v3.datastax_astra._model_mappers import get_mapper from adapta.schema_management.schema_entity import PythonSchemaEntity from adapta.storage.models.enum import QueryEnabledStoreOptions From 5f5af5d05ab4026cfc6e4292aea81f834dcb07ac Mon Sep 17 00:00:00 2001 From: JanusAsmussen Date: Thu, 26 Sep 2024 17:14:36 +0200 Subject: [PATCH 2/7] Fix astra limit --- .../v3/datastax_astra/astra_client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py b/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py index 485c516e..526f3972 100644 --- a/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py +++ b/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py @@ -278,10 +278,11 @@ class Test: raise_on_giveup=True, ) def apply(model: Type[Model], key_column_filter: Dict[str, Any], columns_to_select: Optional[List[str]]): + model = model.filter(**key_column_filter).limit(None) if columns_to_select: - return model.filter(**key_column_filter).only(select_columns) + return model.only(select_columns) - return model.filter(**key_column_filter) + return model def normalize_column_name(column_name: str) -> str: filter_suffix = re.findall(self._filter_pattern, column_name) From de8a1f5eac4ee2fe56c4e01c8da0899928eacaef Mon Sep 17 00:00:00 2001 From: JanusAsmussen Date: Mon, 7 Oct 2024 15:51:10 +0200 Subject: [PATCH 3/7] Add UTC time zone --- .../distributed_object_store/v3/datastax_astra/_model_mappers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py b/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py index 68800572..0ee7871b 100644 --- a/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py +++ b/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py @@ -362,6 +362,7 @@ def _map_to_column( polars.Datetime(time_unit="us"): (columns.DateTime,), polars.Datetime(time_unit="ns"): (columns.DateTime,), polars.Datetime(time_unit="ms"): (columns.DateTime,), + polars.Datetime(time_unit="ms", time_zone='UTC'): (columns.DateTime,), } column_type = mapping.get(type_to_map, None) From 3288cdeaa7f1f842eef968b8460e47012d41e921 Mon Sep 17 00:00:00 2001 From: JanusAsmussen Date: Mon, 7 Oct 2024 15:57:40 +0200 Subject: [PATCH 4/7] Fix time zone --- .../distributed_object_store/v3/datastax_astra/_model_mappers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py b/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py index 0ee7871b..49995116 100644 --- a/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py +++ b/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py @@ -363,6 +363,7 @@ def _map_to_column( polars.Datetime(time_unit="ns"): (columns.DateTime,), polars.Datetime(time_unit="ms"): (columns.DateTime,), polars.Datetime(time_unit="ms", time_zone='UTC'): (columns.DateTime,), + polars.Datetime(time_unit="us", time_zone='UTC'): (columns.DateTime,), } column_type = mapping.get(type_to_map, None) From f6ce9f71412b42c50a5aa33325a9cc6dff28c315 Mon Sep 17 00:00:00 2001 From: JanusAsmussen Date: Wed, 18 Dec 2024 20:23:46 +0100 Subject: [PATCH 5/7] Fix black --- .../v3/datastax_astra/_model_mappers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py b/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py index 49995116..02c23f8b 100644 --- a/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py +++ b/adapta/storage/distributed_object_store/v3/datastax_astra/_model_mappers.py @@ -362,8 +362,8 @@ def _map_to_column( polars.Datetime(time_unit="us"): (columns.DateTime,), polars.Datetime(time_unit="ns"): (columns.DateTime,), polars.Datetime(time_unit="ms"): (columns.DateTime,), - polars.Datetime(time_unit="ms", time_zone='UTC'): (columns.DateTime,), - polars.Datetime(time_unit="us", time_zone='UTC'): (columns.DateTime,), + polars.Datetime(time_unit="ms", time_zone="UTC"): (columns.DateTime,), + polars.Datetime(time_unit="us", time_zone="UTC"): (columns.DateTime,), } column_type = mapping.get(type_to_map, None) From 979650bee1926bfa2fec9d09658c9a70263174de Mon Sep 17 00:00:00 2001 From: JanusAsmussen Date: Fri, 20 Dec 2024 13:47:27 +0100 Subject: [PATCH 6/7] Handle issues with incorrect schema inference in polars --- .../v3/datastax_astra/astra_client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py b/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py index 526f3972..b09e4e0d 100644 --- a/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py +++ b/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py @@ -30,6 +30,8 @@ from dataclasses import asdict from typing import Optional, Dict, TypeVar, Callable, Type, List, Any, Union +from polars.polars import ComputeError + try: from _socket import IPPROTO_TCP, TCP_NODELAY, TCP_USER_TIMEOUT except ImportError: @@ -294,9 +296,16 @@ def normalize_column_name(column_name: str) -> str: def to_frame( model: Type[Model], key_column_filter: Dict[str, Any], columns_to_select: Optional[List[str]] ) -> MetaFrame: + def convert_to_polars(x: list[dict]) -> polars.DataFrame: + try: + return polars.DataFrame(x, schema=select_columns) + except ComputeError: + # Catches errors related to incorrect schema inference and tries again with unlimited schema inference length + return polars.DataFrame(x, schema=select_columns, infer_schema_length=None) + return MetaFrame( [dict(v.items()) for v in list(apply(model, key_column_filter, columns_to_select))], - convert_to_polars=lambda x: polars.DataFrame(x, schema=select_columns), + convert_to_polars=convert_to_polars, convert_to_pandas=lambda x: pandas.DataFrame(x, columns=select_columns), ) From cd1d418a40c60fe0a42c721d564e8d4cce0b55ef Mon Sep 17 00:00:00 2001 From: JanusAsmussen Date: Thu, 3 Apr 2025 15:36:15 +0200 Subject: [PATCH 7/7] Add option to specify limit in QES --- adapta/storage/delta_lake/v3/_functions.py | 5 +++++ .../v3/datastax_astra/astra_client.py | 4 +++- adapta/storage/query_enabled_store/_models.py | 13 ++++++++++--- adapta/storage/query_enabled_store/_qes_astra.py | 3 +++ adapta/storage/query_enabled_store/_qes_delta.py | 2 ++ 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/adapta/storage/delta_lake/v3/_functions.py b/adapta/storage/delta_lake/v3/_functions.py index 6b1b43af..222df047 100644 --- a/adapta/storage/delta_lake/v3/_functions.py +++ b/adapta/storage/delta_lake/v3/_functions.py @@ -45,6 +45,7 @@ def load( # pylint: disable=R0913 columns: Optional[List[str]] = None, batch_size: Optional[int] = None, partition_filter_expressions: Optional[List[Tuple]] = None, + limit: Optional[int] = None, ) -> Union[MetaFrame, Iterator[MetaFrame]]: """ Loads Delta Lake table from Azure or AWS storage and converts it to a pandas dataframe. @@ -60,6 +61,7 @@ def load( # pylint: disable=R0913 :param columns: Optional list of columns to select when reading. Defaults to all columns of not provided. :param batch_size: Optional batch size when reading in batches. If not set, whole table will be loaded into memory. + :param limit: Optional limit on number of rows to read. :param partition_filter_expressions: Optional partitions filters. Examples: partition_filter_expressions = [("day", "=", "3")] @@ -82,6 +84,9 @@ def load( # pylint: disable=R0913 filesystem=auth_client.get_pyarrow_filesystem(path), ) + if limit: + pyarrow_ds = pyarrow_ds.head(limit) + row_filter = ( compile_expression(row_filter, ArrowFilterExpression) if isinstance(row_filter, Expression) else row_filter ) diff --git a/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py b/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py index b09e4e0d..cc0a86c9 100644 --- a/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py +++ b/adapta/storage/distributed_object_store/v3/datastax_astra/astra_client.py @@ -243,6 +243,7 @@ def filter_entities( deduplicate=False, num_threads: Optional[int] = None, options: dict[QueryEnabledStoreOptions, any] = None, + limit: Optional[int] = None, ) -> MetaFrame: """ Run a filter query on the entity of type TModel backed by table `table_name`. @@ -267,6 +268,7 @@ class Test: :param: custom_indexes: An optional list of custom indexes, if it cannot be inferred, if it cannot be inferred from the data model. :param: deduplicate: Optionally deduplicate query result, for example when only the partition key part of a primary key is used to fetch results. :param: num_threads: Optionally run filtering using multiple threads. Setting this to -1 will cause this method to automatically evaluate number of threads based on filter expression size. + :param: limit: Optionally limit the number of results returned. NOTE the limit works per call to Astra and not on the final result. """ @on_exception( @@ -280,7 +282,7 @@ class Test: raise_on_giveup=True, ) def apply(model: Type[Model], key_column_filter: Dict[str, Any], columns_to_select: Optional[List[str]]): - model = model.filter(**key_column_filter).limit(None) + model = model.filter(**key_column_filter).limit(limit) if columns_to_select: return model.only(select_columns) diff --git a/adapta/storage/query_enabled_store/_models.py b/adapta/storage/query_enabled_store/_models.py index 84ddb4e5..912977d3 100644 --- a/adapta/storage/query_enabled_store/_models.py +++ b/adapta/storage/query_enabled_store/_models.py @@ -90,15 +90,16 @@ def _apply_filter( filter_expression: Expression, columns: list[str], options: dict[QueryEnabledStoreOptions, any] | None = None, + limit: Optional[int] = 10000, ) -> Union[MetaFrame, Iterator[MetaFrame]]: """ - Applies the provided filter expression to this Store and returns the result in a pandas DataFrame + Applies the provided filter expression to this Store and returns the result in a MetaFrame """ @abstractmethod def _apply_query(self, query: str) -> Union[MetaFrame, Iterator[MetaFrame]]: """ - Applies a plaintext query to this Store and returns the result in a pandas DataFrame + Applies a plaintext query to this Store and returns the result in a MetaFrame """ @classmethod @@ -146,6 +147,7 @@ def __init__(self, store: QueryEnabledStore, path: DataPath): self._filter_expression: Optional[Expression] = None self._columns: list[str] = [] self._options: dict[QueryEnabledStoreOptions, any] = {} + self._limit = 10000 def filter(self, filter_expression: Expression) -> "QueryConfigurationBuilder": """ @@ -170,6 +172,11 @@ def add_options(self, option_key: QueryEnabledStoreOptions, option_value: any) - self._options[option_key] = option_value + def limit(self, limit: int) -> "QueryConfigurationBuilder": + """ + Limit the number of results returned by the underlying store. + """ + self._limit = limit return self def read(self) -> Union[MetaFrame, Iterator[MetaFrame]]: @@ -180,5 +187,5 @@ def read(self) -> Union[MetaFrame, Iterator[MetaFrame]]: path=self._path, filter_expression=self._filter_expression, columns=self._columns, - options=self._options, + limit=self._limit, ) diff --git a/adapta/storage/query_enabled_store/_qes_astra.py b/adapta/storage/query_enabled_store/_qes_astra.py index d41270d5..d7fea4df 100644 --- a/adapta/storage/query_enabled_store/_qes_astra.py +++ b/adapta/storage/query_enabled_store/_qes_astra.py @@ -87,6 +87,7 @@ def _apply_filter( filter_expression: Expression, columns: list[str], options: dict[QueryEnabledStoreOptions, any] | None = None, + limit: Optional[int] = 10000, ) -> Union[MetaFrame, Iterator[MetaFrame]]: assert isinstance(path, AstraPath) astra_path: AstraPath = path @@ -100,6 +101,7 @@ def _apply_filter( select_columns=columns, num_threads=-1, # auto-infer, see method documentation options=options, + limit=limit, ) return self._astra_client.filter_entities( @@ -110,6 +112,7 @@ def _apply_filter( select_columns=columns, num_threads=-1, # auto-infer, see method documentation options=options, + limit=limit, ) def _apply_query(self, query: str) -> Union[MetaFrame, Iterator[MetaFrame]]: diff --git a/adapta/storage/query_enabled_store/_qes_delta.py b/adapta/storage/query_enabled_store/_qes_delta.py index 138942e3..2c366890 100644 --- a/adapta/storage/query_enabled_store/_qes_delta.py +++ b/adapta/storage/query_enabled_store/_qes_delta.py @@ -76,12 +76,14 @@ def _apply_filter( filter_expression: Expression, columns: list[str], options: dict[QueryEnabledStoreOptions, any] | None = None, + limit: Optional[int] = 10000, ) -> Union[MetaFrame, Iterator[MetaFrame]]: return load( auth_client=self.credentials.auth_client(credentials=self.credentials.auth_client_credentials()), path=path, row_filter=filter_expression, columns=columns if columns else None, + limit=limit, ) def _apply_query(self, query: str) -> Union[MetaFrame, Iterator[MetaFrame]]: