Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from boto3.s3.transfer import ProgressCallbackInvoker
from botocore.client import BaseClient
from botocore.exceptions import BotoCoreError, ClientError
from s3transfer.subscribers import BaseSubscriber as _BaseSubscriber

from .asset_manifests.base_manifest import BaseAssetManifest, BaseManifestPath as RelativeFilePath
from .asset_manifests.hash_algorithms import HashAlgorithm
Expand Down Expand Up @@ -77,6 +78,22 @@

download_logger = getLogger("deadline.job_attachments.download")


class _FileSizeSubscriber(_BaseSubscriber):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please run the integration tests? This code path does change both deadline job download, incremental downloads.

I need to check how this impacts the worker agent too, if its in the call path we need to run the integration tests there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the integration tests, they passed.

"""Subscriber that provides file size to skip HEAD requests."""

def __init__(self, size):
self._size = size

def on_queued(self, future, **kwargs):
future.meta.provide_transfer_size(self._size)
# Provide a dummy etag to skip HEAD request if the method exists (added in s3transfer 0.6.0).
# For downloads from CAS, we don't need etag validation since files are content-addressed.
# Older s3transfer versions don't have this method, so we check before calling.
if hasattr(future.meta, "provide_object_etag"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to just check this once and cache it? (Or just do a version check of some sort) just to avoid looking it up for every on_queued where it shouldn't be changing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each future is a distinct instance, though surely if one has this attribute, they all would. Do you think this check would be slow enough to warrant optimization though? There will be 1 future per file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was wondering that as well. It might not be terribly noticeable tbh? It seems a bit inefficient when every future will be the same but maybe not enough to really care about a hash/lookup per file

future.meta.provide_object_etag("dummy-etag")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting this value skips HeadObject requests? Can we link to docs for reference in the code comment above?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The s3transfer library actually does not have documentation: https://github.com/boto/s3transfer/

This behavior was discovered in the Python version matrix testing. In lieu of documentation, this change has unit tests which verify the behavior we want (i.e. not calling HEAD).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern from the team is this seems to be an internal implementation detail of S3 transfer. I appreciate the improvements, but also need to be careful end to end (JA, worker running jobs) to avoid another regression.

JA is a bit fragile as you have noticed so we want to check everything first.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do what was implemented for Incremental downloads here: https://github.com/aws-deadline/deadline-cloud/blob/mainline/src/deadline/job_attachments/_incremental_downloads/_manifest_s3_downloads.py#L533-L537

It will not depend on an internal API.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that would work. It adds some complexity of having two download paths and two download managers (s3transfer and an internal thread pool), but it does avoid touching s3transfer details. If the JA team prefers that, I can close this PR.



S3_DOWNLOAD_MAX_CONCURRENCY = 10
WINDOWS_MAX_PATH_LENGTH = 260
TEMP_DOWNLOAD_ADDED_CHARS_LENGTH = 9
Expand Down Expand Up @@ -549,7 +566,7 @@ def handler(bytes_downloaded):
if not should_continue:
future.cancel()

subscribers = [ProgressCallbackInvoker(handler)]
subscribers = [_FileSizeSubscriber(file_bytes), ProgressCallbackInvoker(handler)]

future = transfer_manager.download(
bucket=s3_bucket,
Expand Down
54 changes: 53 additions & 1 deletion test/unit/deadline_job_attachments/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -1784,7 +1784,7 @@ def test_download_file_error_message_on_access_denied(self):
s3_client = boto3.client("s3")
stubber = Stubber(s3_client)
stubber.add_client_error(
"head_object",
"get_object",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - is this due to small size at like 1797?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3transfer calls HEAD to get the file size to determine whether it's small enough to get in a signle request or if it should use its multirequest approach. Since we provide the size, it never makes a HEAD request. The first request that fails when permissions are missing is the GET.

service_error_code="AccessDenied",
service_message="Access Denied",
http_status_code=403,
Expand Down Expand Up @@ -1871,6 +1871,58 @@ def test_download_file_error_message_on_timeout(self):
mock_lock.assert_not_called()
mock_collision_dict.assert_not_called()

def test_download_file_does_not_make_head_request_when_size_known(self):
"""
Test that download_file does not make a HEAD request when file size is known from manifest.
"""
mock_s3_client = MagicMock()
mock_transfer_manager = MagicMock()
mock_lock = MagicMock()
mock_collision_dict = MagicMock()

file_size = 12345
file_path = ManifestPathv2023_03_03(
path="inputs/input1.txt", hash="input1", size=file_size, mtime=1234000000
)

with patch(
f"{deadline.__package__}.job_attachments.download.get_s3_client",
return_value=mock_s3_client,
), patch(
f"{deadline.__package__}.job_attachments.download.get_s3_transfer_manager",
return_value=mock_transfer_manager,
), patch(f"{deadline.__package__}.job_attachments.download.Path.mkdir"), patch(
f"{deadline.__package__}.job_attachments.download.os.utime"
):
download_file(
file_path,
HashAlgorithm.XXH128,
"/home/username/assets",
mock_lock,
mock_collision_dict,
"test-bucket",
"rootPrefix/Data",
mock_s3_client,
)
mock_s3_client.head_object.assert_not_called()

def test_file_size_subscriber_without_provide_object_etag(self):
"""
Test that _FileSizeSubscriber works with older s3transfer versions
that don't have provide_object_etag method.
"""
from deadline.job_attachments.download import _FileSizeSubscriber

mock_future = MagicMock()
mock_future.meta.provide_transfer_size = MagicMock()
# Simulate older s3transfer where provide_object_etag doesn't exist
del mock_future.meta.provide_object_etag

subscriber = _FileSizeSubscriber(12345)
subscriber.on_queued(mock_future)

mock_future.meta.provide_transfer_size.assert_called_once_with(12345)

@pytest.mark.skipif(
sys.platform == "win32",
reason="This test is for Linux path only.",
Expand Down
Loading