Skip to content

Commit 704c898

Browse files
V1NAY8sethmlarson
andauthored
Optimize to_pandas() internally to improve performance
Co-authored-by: Seth Michael Larson <seth.larson@elastic.co>
1 parent 6088f2e commit 704c898

2 files changed

Lines changed: 24 additions & 55 deletions

File tree

eland/operations.py

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import copy
1919
import warnings
2020
from collections import defaultdict
21+
from datetime import datetime
2122
from typing import (
2223
TYPE_CHECKING,
2324
Any,
@@ -39,6 +40,7 @@
3940
from eland.common import (
4041
DEFAULT_PAGINATION_SIZE,
4142
DEFAULT_PIT_KEEP_ALIVE,
43+
DEFAULT_PROGRESS_REPORTING_NUM_ROWS,
4244
DEFAULT_SEARCH_SIZE,
4345
SortOrder,
4446
build_pd_series,
@@ -1198,18 +1200,34 @@ def describe(self, query_compiler: "QueryCompiler") -> pd.DataFrame:
11981200
def to_pandas(
11991201
self, query_compiler: "QueryCompiler", show_progress: bool = False
12001202
) -> pd.DataFrame:
1201-
df = self._es_results(query_compiler, show_progress)
12021203

1203-
return df
1204+
df_list: List[pd.DataFrame] = []
1205+
i = 0
1206+
for df in self.search_yield_pandas_dataframes(query_compiler=query_compiler):
1207+
if show_progress:
1208+
i = i + df.shape[0]
1209+
if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
1210+
print(f"{datetime.now()}: read {i} rows")
1211+
df_list.append(df)
1212+
1213+
if show_progress:
1214+
print(f"{datetime.now()}: read {i} rows")
1215+
1216+
# pd.concat() can't handle an empty list
1217+
# because there aren't defined columns.
1218+
if not df_list:
1219+
return query_compiler._empty_pd_ef()
1220+
return pd.concat(df_list)
12041221

12051222
def to_csv(
12061223
self,
12071224
query_compiler: "QueryCompiler",
12081225
show_progress: bool = False,
12091226
**kwargs: Union[bool, str],
12101227
) -> Optional[str]:
1211-
df = self._es_results(query_compiler, show_progress)
1212-
return df.to_csv(**kwargs) # type: ignore[no-any-return]
1228+
return self.to_pandas( # type: ignore[no-any-return]
1229+
query_compiler=query_compiler, show_progress=show_progress
1230+
).to_csv(**kwargs)
12131231

12141232
def search_yield_pandas_dataframes(
12151233
self, query_compiler: "QueryCompiler"
@@ -1241,42 +1259,6 @@ def search_yield_pandas_dataframes(
12411259
df = self._apply_df_post_processing(df, post_processing)
12421260
yield df
12431261

1244-
def _es_results(
1245-
self, query_compiler: "QueryCompiler", show_progress: bool = False
1246-
) -> pd.DataFrame:
1247-
query_params, post_processing = self._resolve_tasks(query_compiler)
1248-
1249-
result_size, sort_params = Operations._query_params_to_size_and_sort(
1250-
query_params
1251-
)
1252-
1253-
script_fields = query_params.script_fields
1254-
query = Query(query_params.query)
1255-
1256-
body = query.to_search_body()
1257-
if script_fields is not None:
1258-
body["script_fields"] = script_fields
1259-
1260-
# Only return requested field_names and add them to body
1261-
_source = query_compiler.get_field_names(include_scripted_fields=False)
1262-
body["_source"] = _source if _source else False
1263-
1264-
if sort_params:
1265-
body["sort"] = [sort_params]
1266-
1267-
es_results: List[Dict[str, Any]] = sum(
1268-
_search_yield_hits(
1269-
query_compiler=query_compiler, body=body, max_number_of_hits=result_size
1270-
),
1271-
[],
1272-
)
1273-
1274-
df = query_compiler._es_results_to_pandas(
1275-
results=es_results, show_progress=show_progress
1276-
)
1277-
df = self._apply_df_post_processing(df, post_processing)
1278-
return df
1279-
12801262
def index_count(self, query_compiler: "QueryCompiler", field: str) -> int:
12811263
# field is the index field so count values
12821264
query_params, post_processing = self._resolve_tasks(query_compiler)

eland/query_compiler.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
# under the License.
1717

1818
import copy
19-
from datetime import datetime
2019
from typing import (
2120
TYPE_CHECKING,
2221
Any,
@@ -33,11 +32,7 @@
3332
import numpy as np
3433
import pandas as pd # type: ignore
3534

36-
from eland.common import (
37-
DEFAULT_PROGRESS_REPORTING_NUM_ROWS,
38-
elasticsearch_date_to_pandas_date,
39-
ensure_es_client,
40-
)
35+
from eland.common import elasticsearch_date_to_pandas_date, ensure_es_client
4136
from eland.field_mappings import FieldMappings
4237
from eland.filter import BooleanFilter, QueryFilter
4338
from eland.index import Index
@@ -149,7 +144,6 @@ def es_dtypes(self) -> pd.Series:
149144
def _es_results_to_pandas(
150145
self,
151146
results: List[Dict[str, Any]],
152-
show_progress: bool = False,
153147
) -> "pd.Dataframe":
154148
"""
155149
Parameters
@@ -274,10 +268,6 @@ def _es_results_to_pandas(
274268
# flatten row to map correctly to 2D DataFrame
275269
rows.append(self._flatten_dict(row, field_mapping_cache))
276270

277-
if show_progress:
278-
if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0:
279-
print(f"{datetime.now()}: read {i} rows")
280-
281271
# Create pandas DataFrame
282272
df = pd.DataFrame(data=rows, index=index)
283273

@@ -299,9 +289,6 @@ def _es_results_to_pandas(
299289
if len(self.columns) > 1:
300290
df = df[self.columns]
301291

302-
if show_progress:
303-
print(f"{datetime.now()}: read {i} rows")
304-
305292
return df
306293

307294
def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"):
@@ -383,7 +370,7 @@ def _index_matches_count(self, items: List[Any]) -> int:
383370
self, self.index.es_index_field, items
384371
)
385372

386-
def _empty_pd_ef(self):
373+
def _empty_pd_ef(self) -> "pd.DataFrame":
387374
# Return an empty dataframe with correct columns and dtypes
388375
df = pd.DataFrame()
389376
for c, d in zip(self.dtypes.index, self.dtypes.values):

0 commit comments

Comments
 (0)