Skip to content

Commit 5bde4cf

Browse files
feat(download): first class support for streaming download in eodag #2013
1 parent d88f651 commit 5bde4cf

15 files changed

Lines changed: 1131 additions & 144 deletions

File tree

docs/api_reference/eoproduct.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Download
2929
--------
3030

3131
.. automethod:: EOProduct.download
32+
.. automethod:: EOProduct.stream_download
3233
.. automethod:: EOProduct.get_quicklook
3334

3435
Conversion

docs/notebooks/tutos/tuto_stream_download_to_s3.ipynb

Lines changed: 576 additions & 0 deletions
Large diffs are not rendered by default.

docs/tutos.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ or run locally after being downloaded (see how to :ref:`install_notebooks`).
3939
notebooks/tutos/tuto_burnt_areas_snappy.ipynb
4040
notebooks/tutos/tuto_dedt_lumi_roi.ipynb
4141
notebooks/tutos/tuto_fedeo_ceda.ipynb
42+
notebooks/tutos/tuto_stream_download_to_s3.ipynb
4243

4344
.. grid:: 1 2 2 3
4445
:gutter: 4
@@ -124,3 +125,11 @@ or run locally after being downloaded (see how to :ref:`install_notebooks`).
124125
:shadow: md
125126

126127
Access Fedeo data through the CEDA API using the dedicated EODAG plugin.
128+
129+
.. grid-item-card:: Download product as stream to S3
130+
:link: notebooks/tutos/tuto_stream_download_to_s3
131+
:link-type: doc
132+
:text-align: center
133+
:shadow: md
134+
135+
Download a product data as a stream and upload it on a S3 bucket without storing it locally.

eodag/api/product/_assets.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,14 @@ def download(self, **kwargs: Unpack[DownloadConf]) -> str:
190190
"""
191191
return self.product.download(asset=self.key, **kwargs)
192192

193+
def stream_download(self, **kwargs: Unpack[DownloadConf]) -> str:
194+
"""Downloads a single asset
195+
196+
:param kwargs: (optional) Additional named-arguments passed to `plugin.download()`
197+
:returns: The absolute path to the downloaded product on the local filesystem
198+
"""
199+
return self.product.stream_download(asset=self.key, **kwargs)
200+
193201
def _repr_html_(self):
194202
thead = f"""<thead><tr><td style='text-align: left; color: grey;'>
195203
{type(self).__name__}&ensp;-&ensp;{self.key}

eodag/api/product/_product.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import re
2424
import tempfile
2525
from datetime import datetime
26-
from typing import TYPE_CHECKING, Any, Iterable, Optional, Union, cast
26+
from typing import TYPE_CHECKING, Any, Iterable, Literal, Optional, Union, cast
2727

2828
import orjson
2929
import requests
@@ -59,6 +59,7 @@
5959
STAC_VERSION,
6060
USER_AGENT,
6161
ProgressCallback,
62+
StreamResponse,
6263
format_string,
6364
get_geometry_from_various,
6465
)
@@ -77,7 +78,6 @@
7778
from eodag.types.download_args import DownloadConf
7879
from eodag.utils import Unpack
7980

80-
8181
logger = logging.getLogger("eodag.product")
8282

8383

@@ -481,6 +481,50 @@ def download(
481481

482482
return fs_path
483483

484+
def stream_download(
485+
self,
486+
byte_range: tuple[Optional[int], Optional[int]] = (None, None),
487+
compress: Literal["zip", "raw", "auto"] = "auto",
488+
wait: float = DEFAULT_DOWNLOAD_WAIT,
489+
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
490+
**kwargs: Unpack[DownloadConf],
491+
) -> StreamResponse:
492+
"""Download as StreamResponse the EO product using the provided download plugin and the
493+
authenticator if necessary.
494+
495+
:param byte_range: (optional) Tuple of first index / last index byte to read
496+
:param compress: (optional) "zip", "raw", "auto"
497+
:param wait: (optional) If download fails, wait time in minutes between
498+
two download tries
499+
:param timeout: (optional) If download fails, maximum time in minutes
500+
before stop retrying to download
501+
:param kwargs: additional kwargs like `dl_url_params` (dict) can be provided
502+
and will override any other values defined in a configuration
503+
file or with environment variables.
504+
:returns: StreamResponse Stream representation of a file
505+
:raises: :class:`~eodag.utils.exceptions.PluginImplementationError`
506+
:raises: :class:`RuntimeError`
507+
"""
508+
if self.downloader is None:
509+
raise RuntimeError(
510+
"EO product is unable to stream_download itself due to lacking of a "
511+
"download plugin"
512+
)
513+
auth = (
514+
self.downloader_auth.authenticate()
515+
if self.downloader_auth is not None
516+
else self.downloader_auth
517+
)
518+
return self.downloader.stream_download(
519+
self,
520+
auth,
521+
byte_range,
522+
compress,
523+
wait=wait,
524+
timeout=timeout,
525+
**kwargs,
526+
)
527+
484528
def _init_progress_bar(
485529
self,
486530
progress_callback: Optional[ProgressCallback],

eodag/plugins/download/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@
1616
# See the License for the specific language governing permissions and
1717
# limitations under the License.
1818
"""EODAG download package"""
19+
from .aws import AwsDownload
20+
from .base import Download
21+
from .http import HTTPDownload
22+
23+
__all__ = ["Download", "AwsDownload", "HTTPDownload"]

eodag/plugins/download/aws.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
properties_from_xml,
3838
)
3939
from eodag.plugins.authentication.aws_auth import AwsAuth, raise_if_auth_error
40-
from eodag.plugins.download.base import Download
4140
from eodag.utils import (
4241
DEFAULT_DOWNLOAD_TIMEOUT,
4342
DEFAULT_DOWNLOAD_WAIT,
@@ -62,6 +61,8 @@
6261
)
6362
from eodag.utils.s3 import S3FileInfo, open_s3_zipped_object, stream_download_from_s3
6463

64+
from .base import Download
65+
6566
if TYPE_CHECKING:
6667
from mypy_boto3_s3 import S3ServiceResource
6768
from mypy_boto3_s3.client import S3Client
@@ -680,7 +681,7 @@ def _get_unique_products(
680681

681682
return unique_product_chunks
682683

683-
def _stream_download_dict(
684+
def stream_download(
684685
self,
685686
product: EOProduct,
686687
auth: Optional[Union[AuthBase, S3ServiceResource]] = None,
@@ -1176,3 +1177,6 @@ def get_chunk_dest_path(
11761177

11771178
logger.debug(f"Downloading {chunk.key} to {product_path}")
11781179
return product_path
1180+
1181+
1182+
__all__ = ["AwsDownload"]

eodag/plugins/download/base.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import tarfile
2525
import tempfile
2626
import zipfile
27+
from abc import abstractmethod
2728
from datetime import datetime, timedelta
2829
from pathlib import Path
2930
from time import sleep
@@ -102,6 +103,7 @@ def __init__(self, provider: str, config: PluginConfig) -> None:
102103
super(Download, self).__init__(provider, config)
103104
self._authenticate = bool(getattr(self.config, "authenticate", False))
104105

106+
@abstractmethod
105107
def download(
106108
self,
107109
product: EOProduct,
@@ -134,7 +136,8 @@ def download(
134136
"A Download plugin must implement a method named download"
135137
)
136138

137-
def _stream_download_dict(
139+
@abstractmethod
140+
def stream_download(
138141
self,
139142
product: EOProduct,
140143
auth: Optional[Union[AuthBase, S3ServiceResource]] = None,
@@ -145,7 +148,7 @@ def _stream_download_dict(
145148
**kwargs: Unpack[DownloadConf],
146149
) -> StreamResponse:
147150
r"""
148-
Base _stream_download_dict method. Not available, it must be defined for each plugin.
151+
Base stream_download method. Not available, it must be defined for each plugin.
149152
150153
:param product: The EO product to download
151154
:param auth: (optional) authenticated object
@@ -160,7 +163,7 @@ def _stream_download_dict(
160163
:returns: Dictionary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments
161164
"""
162165
raise NotImplementedError(
163-
"Download streaming must be implemented using a method named _stream_download_dict"
166+
"Download streaming must be implemented using a method named stream_download"
164167
)
165168

166169
def _prepare_download(
@@ -766,3 +769,6 @@ def _config_executor(
766769

767770
if thread_name_prefix:
768771
executor._thread_name_prefix = "eodag-download-all"
772+
773+
774+
__all__ = ["Download"]

eodag/plugins/download/http.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ def download_request(
662662
**kwargs: Unpack[DownloadConf],
663663
) -> os.PathLike:
664664
is_empty = True
665-
chunk_iterator = self._stream_download(
665+
chunk_iterator = self._raw_stream_download(
666666
product, auth, progress_callback, **kwargs
667667
)
668668
if fs_path is not None:
@@ -757,7 +757,7 @@ def _check_product_filename(self, product: EOProduct) -> str:
757757
filename += ext
758758
return filename
759759

760-
def _stream_download_dict(
760+
def stream_download(
761761
self,
762762
product: EOProduct,
763763
auth: Optional[Union[AuthBase, S3ServiceResource]] = None,
@@ -797,7 +797,7 @@ def _stream_download_dict(
797797
try:
798798
assets_values = product.assets.get_values(kwargs.get("asset"))
799799
with executor:
800-
assets_stream_list = self._stream_download_assets(
800+
assets_stream_list = self._raw_stream_download_assets(
801801
product,
802802
executor,
803803
auth,
@@ -848,7 +848,7 @@ def _stream_download_dict(
848848
else:
849849
pass
850850

851-
chunk_iterator = self._stream_download(product, auth, None, **kwargs)
851+
chunk_iterator = self._raw_stream_download(product, auth, None, **kwargs)
852852

853853
# start reading chunks to set product.headers
854854
try:
@@ -957,7 +957,7 @@ def order(
957957
product, auth
958958
)
959959

960-
def _stream_download(
960+
def _raw_stream_download(
961961
self,
962962
product: EOProduct,
963963
auth: Optional[AuthBase] = None,
@@ -1070,7 +1070,7 @@ def _stream_download(
10701070
product.filename = filename
10711071
return product._stream.iter_content(chunk_size=64 * 1024)
10721072

1073-
def _stream_download_assets(
1073+
def _raw_stream_download_assets(
10741074
self,
10751075
product: EOProduct,
10761076
executor: ThreadPoolExecutor,
@@ -1247,7 +1247,7 @@ def _download_assets(
12471247

12481248
assets_values = product.assets.get_values(kwargs.get("asset"))
12491249

1250-
assets_stream_list = self._stream_download_assets(
1250+
assets_stream_list = self._raw_stream_download_assets(
12511251
product, executor, auth, progress_callback, assets_values, **kwargs
12521252
)
12531253

@@ -1460,3 +1460,6 @@ def fetch_asset_size(asset: Asset) -> None:
14601460
[f.result() for f in as_completed(futures)]
14611461

14621462
return total_size
1463+
1464+
1465+
__all__ = ["HTTPDownload"]

0 commit comments

Comments
 (0)