Skip to content

Commit

Permalink
Add Blob Download Retries to GCS Repository (elastic#52479)
Browse files Browse the repository at this point in the history
* Add Blob Download Retries to GCS Repository

Exactly as elastic#46589 (and kept as close to it as possible code wise so we can dry things up in a follow-up potentially) but for GCS.

Closes elastic#52319
  • Loading branch information
original-brownbear authored and sbourke committed Feb 19, 2020
1 parent ff4f02d commit 70d4484
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.api.gax.paging.Page;
import com.google.cloud.BatchResult;
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
Expand All @@ -34,7 +33,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -47,11 +45,8 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -176,32 +171,7 @@ Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucketName, blobName);
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client().reader(blobId));
return Channels.newInputStream(new ReadableByteChannel() {
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int read(ByteBuffer dst) throws IOException {
try {
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
} catch (StorageException e) {
if (e.getCode() == HTTP_NOT_FOUND) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
throw e;
}
}

@Override
public boolean isOpen() {
return readChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
}
});
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.gcs;

import com.google.cloud.ReadChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.core.internal.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

/**
* Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred.
* This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing
* the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future.
*/
class GoogleCloudStorageRetryingInputStream extends InputStream {

private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class);

static final int MAX_SUPPRESSED_EXCEPTIONS = 10;

private final Storage client;

private final BlobId blobId;

private final int maxRetries;

private InputStream currentStream;
private int attempt = 1;
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
private long currentOffset;
private boolean closed;

GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException {
this.client = client;
this.blobId = blobId;
this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1;
currentStream = openStream();
}

private InputStream openStream() throws IOException {
try {
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId));
if (currentOffset > 0L) {
readChannel.seek(currentOffset);
}
return Channels.newInputStream(new ReadableByteChannel() {
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int read(ByteBuffer dst) throws IOException {
try {
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
} catch (StorageException e) {
if (e.getCode() == HTTP_NOT_FOUND) {
throw new NoSuchFileException("Blob [" + blobId.getName() + "] does not exist");
}
throw e;
}
}

@Override
public boolean isOpen() {
return readChannel.isOpen();
}

@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
}
});
} catch (StorageException e) {
throw addSuppressedExceptions(e);
}
}

@Override
public int read() throws IOException {
ensureOpen();
while (true) {
try {
final int result = currentStream.read();
currentOffset += 1;
return result;
} catch (StorageException e) {
reopenStreamOrFail(e);
}
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
while (true) {
try {
final int bytesRead = currentStream.read(b, off, len);
if (bytesRead == -1) {
return -1;
}
currentOffset += bytesRead;
return bytesRead;
} catch (StorageException e) {
reopenStreamOrFail(e);
}
}
}

private void ensureOpen() {
if (closed) {
assert false : "using GoogleCloudStorageRetryingInputStream after close";
throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close");
}
}

private void reopenStreamOrFail(StorageException e) throws IOException {
if (attempt >= maxRetries) {
throw addSuppressedExceptions(e);
}
logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying",
blobId, currentOffset, attempt, MAX_SUPPRESSED_EXCEPTIONS), e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);
}
IOUtils.closeWhileHandlingException(currentStream);
currentStream = openStream();
}

@Override
public void close() throws IOException {
currentStream.close();
closed = true;
}

@Override
public long skip(long n) {
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
}

@Override
public void reset() {
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
}

private <T extends Exception> T addSuppressedExceptions(T e) {
for (StorageException failure : failures) {
e.addSuppressed(failure);
}
return e;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,35 @@ public void testReadBlobWithRetries() throws Exception {
}
}

public void testReadLargeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(2, 10);
final CountDown countDown = new CountDown(maxRetries);

// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
final byte[] bytes = randomBytes(1 << 22);
httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> {
Streams.readFully(exchange.getRequestBody());
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
final String[] range = exchange.getRequestHeaders().get("Range").get(0).substring("bytes=".length()).split("-");
final int offset = Integer.parseInt(range[0]);
final int end = Integer.parseInt(range[1]);
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.min(end + 1, bytes.length));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), chunk.length);
if (randomBoolean() && countDown.countDown() == false) {
exchange.getResponseBody().write(chunk, 0, chunk.length - 1);
exchange.close();
return;
}
exchange.getResponseBody().write(chunk);
exchange.close();
});

final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
}
}

public void testReadBlobWithReadTimeouts() {
final int maxRetries = randomIntBetween(1, 3);
final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 200)));
Expand Down

0 comments on commit 70d4484

Please sign in to comment.