Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.storage.StorageException;
import com.google.common.io.ByteStreams;
import java.nio.channels.FileChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -55,31 +56,40 @@ final class DirectDownloadCallable implements Callable<DownloadResult> {
public DownloadResult call() {
long bytesCopied = -1L;
try (ReadChannel rc =
storage.reader(
BlobId.of(parallelDownloadConfig.getBucketName(), originalBlob.getName()), opts);
FileChannel wc =
FileChannel.open(
destPath,
storage.reader(
BlobId.of(parallelDownloadConfig.getBucketName(), originalBlob.getName()), opts)) {
StandardOpenOption[] options =
parallelDownloadConfig.isSkipIfExists()
? new StandardOpenOption[] {StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW}
: new StandardOpenOption[] {
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING)) {
rc.setChunkSize(0);
bytesCopied = ByteStreams.copy(rc, wc);
if (originalBlob.getSize() != null) {
if (bytesCopied != originalBlob.getSize()) {
return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(
new StorageException(
0,
"Unexpected end of stream, read "
+ bytesCopied
+ " expected "
+ originalBlob.getSize()
+ " from object "
+ originalBlob.getBlobId().toGsUtilUriWithGeneration()))
.build();
StandardOpenOption.TRUNCATE_EXISTING
};
try (FileChannel wc = FileChannel.open(destPath, options)) {
rc.setChunkSize(0);
bytesCopied = ByteStreams.copy(rc, wc);
if (originalBlob.getSize() != null) {
if (bytesCopied != originalBlob.getSize()) {
return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(
new StorageException(
0,
"Unexpected end of stream, read "
+ bytesCopied
+ " expected "
+ originalBlob.getSize()
+ " from object "
+ originalBlob.getBlobId().toGsUtilUriWithGeneration()))
.build();
}
}
}
} catch (FileAlreadyExistsException e) {
return DownloadResult.newBuilder(originalBlob, TransferStatus.SKIPPED)
.setOutputDestination(destPath)
.setException(e)
.build();
} catch (Exception e) {
if (bytesCopied == -1) {
return DownloadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_START)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ private DownloadResult(

/**
* The destination on the Filesystem the object has been written to. This field will only be
* populated if the Transfer was a {@link TransferStatus#SUCCESS SUCCESS}.
* populated if the Transfer was a {@link TransferStatus#SUCCESS SUCCESS} or {@link
* TransferStatus#SKIPPED SKIPPED}.
*
* @see Builder#setOutputDestination(Path)
*/
public @NonNull Path getOutputDestination() {
checkState(
status == TransferStatus.SUCCESS,
"getOutputDestination() is only valid when status is SUCCESS but status was %s",
status == TransferStatus.SUCCESS || status == TransferStatus.SKIPPED,
"getOutputDestination() is only valid when status is SUCCESS or SKIPPED but status was %s",
status);
return outputDestination;
}
Expand Down Expand Up @@ -206,7 +207,7 @@ public Builder setException(@NonNull Exception exception) {
public DownloadResult build() {
checkNotNull(input);
checkNotNull(status);
if (status == TransferStatus.SUCCESS) {
if (status == TransferStatus.SUCCESS || status == TransferStatus.SKIPPED) {
checkNotNull(outputDestination);
} else if (status == TransferStatus.FAILED_TO_START
|| status == TransferStatus.FAILED_TO_FINISH) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@ public final class ParallelDownloadConfig {
@NonNull private final Path downloadDirectory;
@NonNull private final String bucketName;
@NonNull private final List<BlobSourceOption> optionsPerRequest;
private final boolean skipIfExists;

private ParallelDownloadConfig(
@NonNull String stripPrefix,
@NonNull Path downloadDirectory,
@NonNull String bucketName,
@NonNull List<BlobSourceOption> optionsPerRequest) {
@NonNull List<BlobSourceOption> optionsPerRequest,
boolean skipIfExists) {
this.stripPrefix = stripPrefix;
this.downloadDirectory = downloadDirectory;
this.bucketName = bucketName;
this.optionsPerRequest = optionsPerRequest;
this.skipIfExists = skipIfExists;
}

/**
Expand Down Expand Up @@ -87,6 +90,15 @@ private ParallelDownloadConfig(
return optionsPerRequest;
}

/**
* Whether to skip downloading the file if it already exists in the destination.
*
* @see Builder#setSkipIfExists(boolean)
*/
public boolean isSkipIfExists() {
return skipIfExists;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -99,12 +111,13 @@ public boolean equals(Object o) {
return stripPrefix.equals(that.stripPrefix)
&& downloadDirectory.equals(that.downloadDirectory)
&& bucketName.equals(that.bucketName)
&& optionsPerRequest.equals(that.optionsPerRequest);
&& optionsPerRequest.equals(that.optionsPerRequest)
&& skipIfExists == that.skipIfExists;
}

@Override
public int hashCode() {
return Objects.hash(stripPrefix, downloadDirectory, bucketName, optionsPerRequest);
return Objects.hash(stripPrefix, downloadDirectory, bucketName, optionsPerRequest, skipIfExists);
}

@Override
Expand All @@ -114,6 +127,7 @@ public String toString() {
.add("downloadDirectory", downloadDirectory)
.add("bucketName", bucketName)
.add("optionsPerRequest", optionsPerRequest)
.add("skipIfExists", skipIfExists)
.toString();
}

Expand All @@ -132,12 +146,14 @@ public static final class Builder {
@NonNull private Path downloadDirectory;
@NonNull private String bucketName;
@NonNull private List<BlobSourceOption> optionsPerRequest;
private boolean skipIfExists;

private Builder() {
this.stripPrefix = "";
this.downloadDirectory = Paths.get("");
this.bucketName = "";
this.optionsPerRequest = ImmutableList.of();
this.skipIfExists = false;
}

/**
Expand Down Expand Up @@ -186,6 +202,18 @@ public Builder setOptionsPerRequest(List<BlobSourceOption> optionsPerRequest) {
return this;
}

/**
* Sets the value for skipIfExists. If set to true, the TransferManager will skip downloading the
* file if it already exists in the destination.
*
* @return the builder instance with the value for skipIfExists modified.
* @see ParallelDownloadConfig#isSkipIfExists()
*/
public Builder setSkipIfExists(boolean skipIfExists) {
this.skipIfExists = skipIfExists;
return this;
}

/**
* Creates a ParallelDownloadConfig object.
*
Expand All @@ -197,7 +225,7 @@ public ParallelDownloadConfig build() {
checkNotNull(downloadDirectory);
checkNotNull(optionsPerRequest);
return new ParallelDownloadConfig(
stripPrefix, downloadDirectory, bucketName, optionsPerRequest);
stripPrefix, downloadDirectory, bucketName, optionsPerRequest, skipIfExists);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ public void close() throws Exception {
downloadTasks.add(ApiFutures.immediateFuture(skipped));
continue;
}
if (config.isSkipIfExists() && Files.exists(destPath)) {
DownloadResult skipped =
DownloadResult.newBuilder(blob, TransferStatus.SKIPPED)
.setOutputDestination(destPath)
.build();
downloadTasks.add(ApiFutures.immediateFuture(skipped));
continue;
}
if (transferManagerConfig.isAllowDivideAndConquerDownload()) {
BlobInfo validatedBlob = retrieveSizeAndGeneration(storage, blob, config.getBucketName());
if (validatedBlob != null && qos.divideAndConquer(validatedBlob.getSize())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,43 @@ public void downloadBlobs() throws Exception {
}
}

@Test
public void downloadBlobsSkipIfExists() throws Exception {
TransferManagerConfig config =
TransferManagerConfigTestingInstances.defaults(storage.getOptions());
try (TransferManager transferManager = config.getService()) {
String bucketName = bucket.getName();
ParallelDownloadConfig parallelDownloadConfig =
ParallelDownloadConfig.newBuilder()
.setBucketName(bucketName)
.setDownloadDirectory(baseDir)
.setSkipIfExists(true)
.build();

// First download - should succeed
DownloadJob job = transferManager.downloadBlobs(blobs, parallelDownloadConfig);
List<DownloadResult> downloadResults = job.getDownloadResults();
assertThat(downloadResults).hasSize(3);
assertThat(
downloadResults.stream()
.filter(result -> result.getStatus() == TransferStatus.SUCCESS)
.collect(Collectors.toList()))
.hasSize(3);

// Second download - should skip
DownloadJob job2 = transferManager.downloadBlobs(blobs, parallelDownloadConfig);
List<DownloadResult> downloadResults2 = job2.getDownloadResults();
assertThat(downloadResults2).hasSize(3);
assertThat(
downloadResults2.stream()
.filter(result -> result.getStatus() == TransferStatus.SKIPPED)
.collect(Collectors.toList()))
.hasSize(3);

cleanUpFiles(downloadResults);
}
}

@Test
public void downloadBlobsAllowChunked() throws Exception {
TransferManagerConfig config =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.common.collect.ImmutableList;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.junit.Test;

public class ParallelDownloadConfigTest {

@Test
public void testParallelDownloadConfigWithSkipIfExists() {
String bucketName = "bucketName";
String stripPrefix = "stripPrefix";
Path downloadDirectory = Paths.get("downloadDirectory");
List<BlobSourceOption> optionsPerRequest = ImmutableList.of();
boolean skipIfExists = true;

ParallelDownloadConfig config =
ParallelDownloadConfig.newBuilder()
.setBucketName(bucketName)
.setStripPrefix(stripPrefix)
.setDownloadDirectory(downloadDirectory)
.setOptionsPerRequest(optionsPerRequest)
.setSkipIfExists(skipIfExists)
.build();

assertThat(config.getBucketName()).isEqualTo(bucketName);
assertThat(config.getStripPrefix()).isEqualTo(stripPrefix);
assertThat(config.getDownloadDirectory()).isEqualTo(downloadDirectory.toAbsolutePath());
assertThat(config.getOptionsPerRequest()).isEqualTo(optionsPerRequest);
assertThat(config.isSkipIfExists()).isEqualTo(skipIfExists);
}

@Test
public void testParallelDownloadConfigDefaultSkipIfExists() {
String bucketName = "bucketName";
String stripPrefix = "stripPrefix";
Path downloadDirectory = Paths.get("downloadDirectory");
List<BlobSourceOption> optionsPerRequest = ImmutableList.of();

ParallelDownloadConfig config =
ParallelDownloadConfig.newBuilder()
.setBucketName(bucketName)
.setStripPrefix(stripPrefix)
.setDownloadDirectory(downloadDirectory)
.setOptionsPerRequest(optionsPerRequest)
.build();

assertThat(config.isSkipIfExists()).isFalse();
}

@Test
public void testEquality() {
String bucketName = "bucketName";
String stripPrefix = "stripPrefix";
Path downloadDirectory = Paths.get("downloadDirectory");
List<BlobSourceOption> optionsPerRequest = ImmutableList.of();
boolean skipIfExists = true;

ParallelDownloadConfig config1 =
ParallelDownloadConfig.newBuilder()
.setBucketName(bucketName)
.setStripPrefix(stripPrefix)
.setDownloadDirectory(downloadDirectory)
.setOptionsPerRequest(optionsPerRequest)
.setSkipIfExists(skipIfExists)
.build();

ParallelDownloadConfig config2 =
ParallelDownloadConfig.newBuilder()
.setBucketName(bucketName)
.setStripPrefix(stripPrefix)
.setDownloadDirectory(downloadDirectory)
.setOptionsPerRequest(optionsPerRequest)
.setSkipIfExists(skipIfExists)
.build();

assertThat(config1).isEqualTo(config2);
assertThat(config1.hashCode()).isEqualTo(config2.hashCode());
}
}
Loading