Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3929 Slow download performance for Storage API. Added new downloadToPathWithMediaHttpDownloader method with better performance. #4337

Closed
wants to merge 13 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@

package com.google.cloud.storage;

import static com.google.cloud.RetryHelper.runWithRetries;
import static com.google.cloud.storage.Blob.BlobSourceOption.toGetOptions;
import static com.google.cloud.storage.Blob.BlobSourceOption.toSourceOptions;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.auth.ServiceAccountSigner;
import com.google.auth.ServiceAccountSigner.SigningException;
import com.google.cloud.ReadChannel;
import com.google.cloud.RetryHelper;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
Expand All @@ -34,6 +37,7 @@
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.Function;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -237,6 +241,41 @@ public void downloadTo(Path path, BlobSourceOption... options) {
}
}

/**
* Downloads this blob to the given output stream path using specified blob read options.
*
* @param outputStream destination
* @param options blob read options
* @throws StorageException upon failure
*/
public void downloadTo(OutputStream outputStream, final BlobSourceOption... options) {
try (CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream)) {
final StorageObject storageObject = getBlobId().toPb();
final StorageOptions storageOptions = this.options;
final StorageRpc storageRpc = storageOptions.getStorageRpcV1();
final Map<StorageRpc.Option, ?> requestOptions = StorageImpl.optionMap(getBlobId(), options);
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
storageRpc.readToOutputStream(
storageObject,
countingOutputStream.getCount(),
countingOutputStream,
requestOptions);
}
}),
storageOptions.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
storageOptions.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
} catch (IOException e) {
throw new StorageException(e);
}
}
Copy link
Member

@frankyn frankyn Jan 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrey-qlogic, I appreciate your patience.

I'd recommend passing through an additional option (USE_DIRECT_DOWNLOAD) to downloadTo() if using getMediaHttpDownloader is considered a breaking change:

Then in the underlying RPC class handle the request in
HttpStorageRpc.getCall() which called by HttpStorageRpc.get().

WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added USE_DIRECT_DOWNLOAD to set directDownloadEnabled for MediaHttpDownloader


/**
* Downloads this blob to the given file path.
*
Expand All @@ -250,6 +289,16 @@ public void downloadTo(Path path) {
downloadTo(path, new BlobSourceOption[0]);
}

/**
* Downloads this blob to the given outputStream.
andrey-qlogic marked this conversation as resolved.
Show resolved Hide resolved
*
* @param outputStream destination
* @throws StorageException upon failure
*/
public void downloadTo(OutputStream outputStream) {
downloadTo(outputStream, new BlobSourceOption[0]);
}

/** Builder for {@code Blob}. */
public static class Builder extends BlobInfo.Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
Expand Down Expand Up @@ -627,6 +628,34 @@ public RpcBatch createBatch() {
return new DefaultRpcBatch(storage);
}

@Override
public void readToOutputStream(
StorageObject from, long position, CountingOutputStream to, Map<Option, ?> options) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_READ);
Scope scope = tracer.withSpan(span);
try {
Get req =
storage
.objects()
.get(from.getBucket(), from.getName())
.setGeneration(from.getGeneration())
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(options))
.setIfMetagenerationNotMatch(Option.IF_METAGENERATION_NOT_MATCH.getLong(options))
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(options))
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
.setUserProject(Option.USER_PROJECT.getString(options));
req.getMediaHttpDownloader().setDirectDownloadEnabled(true).setBytesDownloaded(position);
req.executeMediaAndDownloadTo(to);
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
StorageException serviceException = translate(ex);
throw serviceException;
} finally {
scope.close();
span.end();
}
}

@Override
public Tuple<String, byte[]> read(
StorageObject from, Map<Option, ?> options, long position, int bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.ServiceRpc;
import com.google.cloud.Tuple;
import com.google.cloud.storage.StorageException;
import com.google.common.io.CountingOutputStream;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -57,6 +58,7 @@ enum Option {
FIELDS("fields"),
CUSTOMER_SUPPLIED_KEY("customerSuppliedKey"),
USE_DIRECT_DOWNLOAD("useDirectDownload"),
READ_CHANNEL_CHUNK_SIZE_MULTIPLIER("useDirectDownload"),
USER_PROJECT("userProject"),
KMS_KEY_NAME("kmsKeyName");

Expand Down Expand Up @@ -282,6 +284,14 @@ StorageObject compose(
*/
Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position, int bytes);

/**
* Reads from a storage object at the given position directly to outputstream
andrey-qlogic marked this conversation as resolved.
Show resolved Hide resolved
*
* @throws StorageException upon failure
*/
void readToOutputStream(
StorageObject from, long position, CountingOutputStream to, Map<Option, ?> options);

/**
* Opens a resumable upload channel for a given storage object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,38 +588,4 @@ public Integer answer() throws Throwable {
assertArrayEquals(expected, actual);
}

@Test
public void testDownloadWithUseDirectDownload() throws Exception {
final byte[] expected = {1, 2};

initializeExpectedBlob(2);
ReadChannel channel = createNiceMock(ReadChannel.class);
expect(storage.getOptions()).andReturn(mockOptions);
expect(
storage.reader(
BLOB_INFO.getBlobId(),
Storage.BlobSourceOption.useDirectDownload(true)))
.andReturn(channel);
replay(storage);
// First read should return 2 bytes.
expect(channel.read(anyObject(ByteBuffer.class)))
.andAnswer(
new IAnswer<Integer>() {
@Override
public Integer answer() throws Throwable {
// Modify the argument to match the expected behavior of `read`.
((ByteBuffer) getCurrentArguments()[0]).put(expected);
return 2;
}
});
// Second read should return 0 bytes.
expect(channel.read(anyObject(ByteBuffer.class))).andReturn(0);
replay(channel);
initializeBlob();

File file = File.createTempFile("blob", ".tmp");
blob.downloadTo(file.toPath(), BlobSourceOption.useDirectDownload(true));
byte actual[] = Files.readAllBytes(file.toPath());
assertArrayEquals(expected, actual);
}
}