From 8dd6bad13cb905bad1469791882fcefbb9c84dc9 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Tue, 17 Feb 2026 17:44:39 +0000 Subject: [PATCH] feat: Implement skipIfExists option for downloadBlobs in TransferManager Added `skipIfExists` field to `ParallelDownloadConfig`. Updated `TransferManagerImpl` to check for file existence before download. Updated `DirectDownloadCallable` to use `CREATE_NEW` option when `skipIfExists` is enabled to handle race conditions. Updated `DownloadResult` to support `SKIPPED` status for `getOutputDestination`. Added unit and integration tests. Co-authored-by: nidhiii-27 <224584462+nidhiii-27@users.noreply.github.com> --- .../DirectDownloadCallable.java | 52 ++++---- .../transfermanager/DownloadResult.java | 9 +- .../ParallelDownloadConfig.java | 36 +++++- .../transfermanager/TransferManagerImpl.java | 8 ++ .../storage/it/ITTransferManagerTest.java | 37 ++++++ .../ParallelDownloadConfigTest.java | 101 +++++++++++++++ .../TransferManagerImplTest.java | 119 ++++++++++++++++++ 7 files changed, 333 insertions(+), 29 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfigTest.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/transfermanager/TransferManagerImplTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DirectDownloadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DirectDownloadCallable.java index 47f782e8bd..d9d6b5541c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DirectDownloadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DirectDownloadCallable.java @@ -24,6 +24,7 @@ import com.google.cloud.storage.StorageException; import com.google.common.io.ByteStreams; import java.nio.channels.FileChannel; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.concurrent.Callable; @@ -55,31 +56,40 @@ final class DirectDownloadCallable implements Callable { public DownloadResult call() { long bytesCopied = -1L; try (ReadChannel rc = - storage.reader( - BlobId.of(parallelDownloadConfig.getBucketName(), originalBlob.getName()), opts); - FileChannel wc = - FileChannel.open( - destPath, + storage.reader( + BlobId.of(parallelDownloadConfig.getBucketName(), originalBlob.getName()), opts)) { + StandardOpenOption[] options = + parallelDownloadConfig.isSkipIfExists() + ? new StandardOpenOption[] {StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW} + : new StandardOpenOption[] { StandardOpenOption.WRITE, StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING)) { - rc.setChunkSize(0); - bytesCopied = ByteStreams.copy(rc, wc); - if (originalBlob.getSize() != null) { - if (bytesCopied != originalBlob.getSize()) { - return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH) - .setException( - new StorageException( - 0, - "Unexpected end of stream, read " - + bytesCopied - + " expected " - + originalBlob.getSize() - + " from object " - + originalBlob.getBlobId().toGsUtilUriWithGeneration())) - .build(); + StandardOpenOption.TRUNCATE_EXISTING + }; + try (FileChannel wc = FileChannel.open(destPath, options)) { + rc.setChunkSize(0); + bytesCopied = ByteStreams.copy(rc, wc); + if (originalBlob.getSize() != null) { + if (bytesCopied != originalBlob.getSize()) { + return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH) + .setException( + new StorageException( + 0, + "Unexpected end of stream, read " + + bytesCopied + + " expected " + + originalBlob.getSize() + + " from object " + + originalBlob.getBlobId().toGsUtilUriWithGeneration())) + .build(); + } } } + } catch (FileAlreadyExistsException e) { + return DownloadResult.newBuilder(originalBlob, TransferStatus.SKIPPED) + .setOutputDestination(destPath) + .setException(e) + .build(); } catch (Exception e) { if (bytesCopied == -1) { return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_START) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadResult.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadResult.java index 7084e7553a..3047b98757 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadResult.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadResult.java @@ -63,14 +63,15 @@ private DownloadResult( /** * The destination on the Filesystem the object has been written to. This field will only be - * populated if the Transfer was a {@link TransferStatus#SUCCESS SUCCESS}. + * populated if the Transfer was a {@link TransferStatus#SUCCESS SUCCESS} or {@link + * TransferStatus#SKIPPED SKIPPED}. * * @see Builder#setOutputDestination(Path) */ public @NonNull Path getOutputDestination() { checkState( - status == TransferStatus.SUCCESS, - "getOutputDestination() is only valid when status is SUCCESS but status was %s", + status == TransferStatus.SUCCESS || status == TransferStatus.SKIPPED, + "getOutputDestination() is only valid when status is SUCCESS or SKIPPED but status was %s", status); return outputDestination; } @@ -206,7 +207,7 @@ public Builder setException(@NonNull Exception exception) { public DownloadResult build() { checkNotNull(input); checkNotNull(status); - if (status == TransferStatus.SUCCESS) { + if (status == TransferStatus.SUCCESS || status == TransferStatus.SKIPPED) { checkNotNull(outputDestination); } else if (status == TransferStatus.FAILED_TO_START || status == TransferStatus.FAILED_TO_FINISH) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfig.java index 3e1c6e6fd1..3619c18dac 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfig.java @@ -38,16 +38,19 @@ public final class ParallelDownloadConfig { @NonNull private final Path downloadDirectory; @NonNull private final String bucketName; @NonNull private final List optionsPerRequest; + private final boolean skipIfExists; private ParallelDownloadConfig( @NonNull String stripPrefix, @NonNull Path downloadDirectory, @NonNull String bucketName, - @NonNull List optionsPerRequest) { + @NonNull List optionsPerRequest, + boolean skipIfExists) { this.stripPrefix = stripPrefix; this.downloadDirectory = downloadDirectory; this.bucketName = bucketName; this.optionsPerRequest = optionsPerRequest; + this.skipIfExists = skipIfExists; } /** @@ -87,6 +90,15 @@ private ParallelDownloadConfig( return optionsPerRequest; } + /** + * Whether to skip downloading the file if it already exists in the destination. + * + * @see Builder#setSkipIfExists(boolean) + */ + public boolean isSkipIfExists() { + return skipIfExists; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -99,12 +111,13 @@ public boolean equals(Object o) { return stripPrefix.equals(that.stripPrefix) && downloadDirectory.equals(that.downloadDirectory) && bucketName.equals(that.bucketName) - && optionsPerRequest.equals(that.optionsPerRequest); + && optionsPerRequest.equals(that.optionsPerRequest) + && skipIfExists == that.skipIfExists; } @Override public int hashCode() { - return Objects.hash(stripPrefix, downloadDirectory, bucketName, optionsPerRequest); + return Objects.hash(stripPrefix, downloadDirectory, bucketName, optionsPerRequest, skipIfExists); } @Override @@ -114,6 +127,7 @@ public String toString() { .add("downloadDirectory", downloadDirectory) .add("bucketName", bucketName) .add("optionsPerRequest", optionsPerRequest) + .add("skipIfExists", skipIfExists) .toString(); } @@ -132,12 +146,14 @@ public static final class Builder { @NonNull private Path downloadDirectory; @NonNull private String bucketName; @NonNull private List optionsPerRequest; + private boolean skipIfExists; private Builder() { this.stripPrefix = ""; this.downloadDirectory = Paths.get(""); this.bucketName = ""; this.optionsPerRequest = ImmutableList.of(); + this.skipIfExists = false; } /** @@ -186,6 +202,18 @@ public Builder setOptionsPerRequest(List optionsPerRequest) { return this; } + /** + * Sets the value for skipIfExists. If set to true, the TransferManager will skip downloading the + * file if it already exists in the destination. + * + * @return the builder instance with the value for skipIfExists modified. + * @see ParallelDownloadConfig#isSkipIfExists() + */ + public Builder setSkipIfExists(boolean skipIfExists) { + this.skipIfExists = skipIfExists; + return this; + } + /** * Creates a ParallelDownloadConfig object. * @@ -197,7 +225,7 @@ public ParallelDownloadConfig build() { checkNotNull(downloadDirectory); checkNotNull(optionsPerRequest); return new ParallelDownloadConfig( - stripPrefix, downloadDirectory, bucketName, optionsPerRequest); + stripPrefix, downloadDirectory, bucketName, optionsPerRequest, skipIfExists); } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java index aa1cfadf1e..1c265e4e1b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java @@ -157,6 +157,14 @@ public void close() throws Exception { downloadTasks.add(ApiFutures.immediateFuture(skipped)); continue; } + if (config.isSkipIfExists() && Files.exists(destPath)) { + DownloadResult skipped = + DownloadResult.newBuilder(blob, TransferStatus.SKIPPED) + .setOutputDestination(destPath) + .build(); + downloadTasks.add(ApiFutures.immediateFuture(skipped)); + continue; + } if (transferManagerConfig.isAllowDivideAndConquerDownload()) { BlobInfo validatedBlob = retrieveSizeAndGeneration(storage, blob, config.getBucketName()); if (validatedBlob != null && qos.divideAndConquer(validatedBlob.getSize())) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java index e5794fa819..e6a718522e 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java @@ -336,6 +336,43 @@ public void downloadBlobs() throws Exception { } } + @Test + public void downloadBlobsSkipIfExists() throws Exception { + TransferManagerConfig config = + TransferManagerConfigTestingInstances.defaults(storage.getOptions()); + try (TransferManager transferManager = config.getService()) { + String bucketName = bucket.getName(); + ParallelDownloadConfig parallelDownloadConfig = + ParallelDownloadConfig.newBuilder() + .setBucketName(bucketName) + .setDownloadDirectory(baseDir) + .setSkipIfExists(true) + .build(); + + // First download - should succeed + DownloadJob job = transferManager.downloadBlobs(blobs, parallelDownloadConfig); + List downloadResults = job.getDownloadResults(); + assertThat(downloadResults).hasSize(3); + assertThat( + downloadResults.stream() + .filter(result -> result.getStatus() == TransferStatus.SUCCESS) + .collect(Collectors.toList())) + .hasSize(3); + + // Second download - should skip + DownloadJob job2 = transferManager.downloadBlobs(blobs, parallelDownloadConfig); + List downloadResults2 = job2.getDownloadResults(); + assertThat(downloadResults2).hasSize(3); + assertThat( + downloadResults2.stream() + .filter(result -> result.getStatus() == TransferStatus.SKIPPED) + .collect(Collectors.toList())) + .hasSize(3); + + cleanUpFiles(downloadResults); + } + } + @Test public void downloadBlobsAllowChunked() throws Exception { TransferManagerConfig config = diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfigTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfigTest.java new file mode 100644 index 0000000000..86fe0a6069 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/transfermanager/ParallelDownloadConfigTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.transfermanager; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.common.collect.ImmutableList; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import org.junit.Test; + +public class ParallelDownloadConfigTest { + + @Test + public void testParallelDownloadConfigWithSkipIfExists() { + String bucketName = "bucketName"; + String stripPrefix = "stripPrefix"; + Path downloadDirectory = Paths.get("downloadDirectory"); + List optionsPerRequest = ImmutableList.of(); + boolean skipIfExists = true; + + ParallelDownloadConfig config = + ParallelDownloadConfig.newBuilder() + .setBucketName(bucketName) + .setStripPrefix(stripPrefix) + .setDownloadDirectory(downloadDirectory) + .setOptionsPerRequest(optionsPerRequest) + .setSkipIfExists(skipIfExists) + .build(); + + assertThat(config.getBucketName()).isEqualTo(bucketName); + assertThat(config.getStripPrefix()).isEqualTo(stripPrefix); + assertThat(config.getDownloadDirectory()).isEqualTo(downloadDirectory.toAbsolutePath()); + assertThat(config.getOptionsPerRequest()).isEqualTo(optionsPerRequest); + assertThat(config.isSkipIfExists()).isEqualTo(skipIfExists); + } + + @Test + public void testParallelDownloadConfigDefaultSkipIfExists() { + String bucketName = "bucketName"; + String stripPrefix = "stripPrefix"; + Path downloadDirectory = Paths.get("downloadDirectory"); + List optionsPerRequest = ImmutableList.of(); + + ParallelDownloadConfig config = + ParallelDownloadConfig.newBuilder() + .setBucketName(bucketName) + .setStripPrefix(stripPrefix) + .setDownloadDirectory(downloadDirectory) + .setOptionsPerRequest(optionsPerRequest) + .build(); + + assertThat(config.isSkipIfExists()).isFalse(); + } + + @Test + public void testEquality() { + String bucketName = "bucketName"; + String stripPrefix = "stripPrefix"; + Path downloadDirectory = Paths.get("downloadDirectory"); + List optionsPerRequest = ImmutableList.of(); + boolean skipIfExists = true; + + ParallelDownloadConfig config1 = + ParallelDownloadConfig.newBuilder() + .setBucketName(bucketName) + .setStripPrefix(stripPrefix) + .setDownloadDirectory(downloadDirectory) + .setOptionsPerRequest(optionsPerRequest) + .setSkipIfExists(skipIfExists) + .build(); + + ParallelDownloadConfig config2 = + ParallelDownloadConfig.newBuilder() + .setBucketName(bucketName) + .setStripPrefix(stripPrefix) + .setDownloadDirectory(downloadDirectory) + .setOptionsPerRequest(optionsPerRequest) + .setSkipIfExists(skipIfExists) + .build(); + + assertThat(config1).isEqualTo(config2); + assertThat(config1.hashCode()).isEqualTo(config2.hashCode()); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/transfermanager/TransferManagerImplTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/transfermanager/TransferManagerImplTest.java new file mode 100644 index 0000000000..2f902c9c07 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/transfermanager/TransferManagerImplTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.transfermanager; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.cloud.ReadChannel; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TransferManagerImplTest { + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Storage storage; + private TransferManagerConfig transferManagerConfig; + private Qos qos; + private TransferManagerImpl transferManager; + + @Before + public void setUp() { + storage = mock(Storage.class); + StorageOptions storageOptions = + StorageOptions.newBuilder() + .setProjectId("p") + .setServiceFactory(opt -> storage) + .build(); + + transferManagerConfig = + TransferManagerConfig.newBuilder() + .setMaxWorkers(1) + .setPerWorkerBufferSize(1024) + .setStorageOptions(storageOptions) + .build(); + qos = mock(Qos.class); + + transferManager = new TransferManagerImpl(transferManagerConfig, qos); + } + + @Test + public void downloadBlobs_skipIfExists_skipsExistingFile() throws Exception { + Path downloadDir = temporaryFolder.newFolder().toPath(); + String bucketName = "bucket"; + String blobName = "blob.txt"; + Path destFile = downloadDir.resolve(blobName); + Files.createFile(destFile); + + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, blobName)).build(); + ParallelDownloadConfig config = + ParallelDownloadConfig.newBuilder() + .setBucketName(bucketName) + .setDownloadDirectory(downloadDir) + .setSkipIfExists(true) + .build(); + + DownloadJob job = transferManager.downloadBlobs(Collections.singletonList(blobInfo), config); + List results = job.getDownloadResults(); + + assertThat(results).hasSize(1); + DownloadResult result = results.get(0); + assertThat(result.getStatus()).isEqualTo(TransferStatus.SKIPPED); + assertThat(result.getOutputDestination()).isEqualTo(destFile); + } + + @Test + public void downloadBlobs_skipIfExists_doesNotSkipNonExistingFile() throws Exception { + Path downloadDir = temporaryFolder.newFolder().toPath(); + String bucketName = "bucket"; + String blobName = "blob.txt"; + // File does not exist + + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, blobName)).build(); + ParallelDownloadConfig config = + ParallelDownloadConfig.newBuilder() + .setBucketName(bucketName) + .setDownloadDirectory(downloadDir) + .setSkipIfExists(true) + .build(); + + // Mock storage to throw exception when reader is called, so we know it tried to download + when(storage.reader(any(BlobId.class), any())).thenThrow(new RuntimeException("Reader called")); + + DownloadJob job = transferManager.downloadBlobs(Collections.singletonList(blobInfo), config); + List results = job.getDownloadResults(); + + assertThat(results).hasSize(1); + DownloadResult result = results.get(0); + // It should not be SKIPPED, it should be FAILED_TO_START because of our exception + assertThat(result.getStatus()).isEqualTo(TransferStatus.FAILED_TO_START); + assertThat(result.getException()).hasMessageThat().contains("Reader called"); + } +}