Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume partial download from S3 on connection drop #46589

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
Expand All @@ -31,7 +30,6 @@
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.lucene.util.SetOnce;
Expand All @@ -48,7 +46,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -81,18 +78,7 @@ class S3BlobContainer extends AbstractBlobContainer {

@Override
public InputStream readBlob(String blobName) throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(blobStore.bucket(),
buildKey(blobName)));
return s3Object.getObjectContent();
} catch (final AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
}
return new S3RetryingInputStream(blobStore, buildKey(blobName));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public AmazonS3Reference clientReference() {
return service.client(repositoryMetaData);
}

int getMaxRetries() {
return service.settings(repositoryMetaData).maxRetries;
}

public String bucket() {
return bucket;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;

/**
* Wrapper around an S3 object that will retry the {@link GetObjectRequest} if the download fails part-way through, resuming from where
* the failure occurred. This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you link to the corresponding open AWS SDK issue? i.e. aws/aws-sdk-java#856

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done in 3f8c20e. I am not convinced that that's the whole issue, because the problem we were chasing was to do with S3 actively closing the connection rather than a network timeout, but there doesn't seem to be an issue for that.

* the {@link Version#V_7_0_0} version constant) and removed when the SDK handles retries itself.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, otherwise we'll adding retries over retries

*
* See https://github.com/aws/aws-sdk-java/issues/856 for the related SDK issue
*/
class S3RetryingInputStream extends InputStream {

private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class);

static final int MAX_SUPPRESSED_EXCEPTIONS = 10;

private final S3BlobStore blobStore;
private final String blobKey;
private final int maxAttempts;

private InputStream currentStream;
private int attempt = 1;
private List<IOException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
private long currentOffset;
private boolean closed;

S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this.blobStore = blobStore;
this.blobKey = blobKey;
this.maxAttempts = blobStore.getMaxRetries() + 1;
currentStream = openStream();
}

private InputStream openStream() throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
if (currentOffset > 0) {
getObjectRequest.setRange(currentOffset);
}
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
return s3Object.getObjectContent();
} catch (final AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw addSuppressedExceptions(new NoSuchFileException("Blob object [" + blobKey + "] not found: " + e.getMessage()));
}
}
throw addSuppressedExceptions(e);
}
}

@Override
public int read() throws IOException {
ensureOpen();
while (true) {
try {
final int result = currentStream.read();
currentOffset += 1;
return result;
} catch (IOException e) {
reopenStreamOrFail(e);
}
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
while (true) {
try {
final int bytesRead = currentStream.read(b, off, len);
if (bytesRead == -1) {
return -1;
}
currentOffset += bytesRead;
return bytesRead;
} catch (IOException e) {
reopenStreamOrFail(e);
}
}
}

private void ensureOpen() {
if (closed) {
assert false : "using S3RetryingInputStream after close";
throw new IllegalStateException("using S3RetryingInputStream after close");
}
}

private void reopenStreamOrFail(IOException e) throws IOException {
if (attempt >= maxAttempts) {
throw addSuppressedExceptions(e);
}
logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying",
blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);
}
IOUtils.closeWhileHandlingException(currentStream);
currentStream = openStream();
}

@Override
public void close() throws IOException {
currentStream.close();
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
closed = true;
}

@Override
public long skip(long n) {
throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking");
}

@Override
public void reset() {
throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking");
}

private <T extends Exception> T addSuppressedExceptions(T e) {
for (IOException failure : failures) {
e.addSuppressed(failure);
}
return e;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public AmazonS3Reference client(RepositoryMetaData repositoryMetaData) {
* @param repositoryMetaData Repository Metadata
* @return S3ClientSettings
*/
private S3ClientSettings settings(RepositoryMetaData repositoryMetaData) {
S3ClientSettings settings(RepositoryMetaData repositoryMetaData) {
final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetaData.settings());
final S3ClientSettings staticSettings = staticClientSettings.get(clientName);
if (staticSettings != null) {
Expand Down
Loading