Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel exceptions #6425

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.EOFException;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Http channel exception; subclass of EOFException.
* In particular:
* - NoHttpResponseException
* - OpenSSL errors
* The http client library exceptions may be shaded/unshaded; this is the
* exception used in retry policies.
*/
@InterfaceAudience.Private
public class HttpChannelEOFException extends EOFException {

public HttpChannelEOFException(final String path,
final String error,
final Throwable cause) {
super(error);
initCause(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ public <T> T retryUntranslated(
if (caught instanceof IOException) {
translated = (IOException) caught;
} else {
translated = S3AUtils.translateException(text, "",
translated = S3AUtils.translateException(text, "/",
(SdkException) caught);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.EOFException;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Status code 416, range not satisfiable.
* Subclass of {@link EOFException} so that any code which expects that to
* be the outcome of a 416 failure will continue to work.
*/
@InterfaceAudience.Private
public class RangeNotSatisfiableEOFException extends EOFException {

public RangeNotSatisfiableEOFException(
String operation,
Exception cause) {
super(operation);
initCause(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";

/**
* Switch for behavior on when wrappedStream.read()
* returns -1 or raises an EOF; the original semantics
* are that the stream is kept open.
* Value {@value}.
*/
private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true;

/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
Expand Down Expand Up @@ -333,7 +341,7 @@ private void seekQuietly(long positiveTargetPos) {
@Retries.OnceTranslated
private void seekInStream(long targetPos, long length) throws IOException {
checkNotClosed();
if (wrappedStream == null) {
if (!isObjectStreamOpen()) {
return;
}
// compute how much more to skip
Expand Down Expand Up @@ -406,22 +414,29 @@ public boolean seekToNewSource(long targetPos) throws IOException {

/**
* Perform lazy seek and adjust stream to correct position for reading.
*
* If an EOF Exception is raised there are two possibilities
* <ol>
* <li>the stream is at the end of the file</li>
* <li>something went wrong with the network connection</li>
* </ol>
* This method does not attempt to distinguish; it assumes that an EOF
* exception is always "end of file".
* @param targetPos position from where data should be read
* @param len length of the content that needs to be read
* @throws RangeNotSatisfiableEOFException GET is out of range
* @throws IOException anything else.
*/
@Retries.RetryTranslated
private void lazySeek(long targetPos, long len) throws IOException {

Invoker invoker = context.getReadInvoker();
invoker.maybeRetry(streamStatistics.getOpenOperations() == 0,
"lazySeek", pathStr, true,
invoker.retry("lazySeek to " + targetPos, pathStr, true,
() -> {
//For lazy seek
seekInStream(targetPos, len);

//re-open at specific location if needed
if (wrappedStream == null) {
if (!isObjectStreamOpen()) {
reopen("read from new offset", targetPos, len, false);
}
});
Expand Down Expand Up @@ -449,7 +464,9 @@ public synchronized int read() throws IOException {

try {
lazySeek(nextReadPos, 1);
} catch (EOFException e) {
} catch (RangeNotSatisfiableEOFException e) {
// attempt to GET beyond the end of the object
LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos);
return -1;
}

Expand All @@ -460,14 +477,12 @@ public synchronized int read() throws IOException {
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (wrappedStream == null) {
if (!isObjectStreamOpen()) {
reopen("failure recovery", getPos(), 1, false);
}
try {
b = wrappedStream.read();
} catch (EOFException e) {
return -1;
} catch (SocketTimeoutException e) {
} catch (HttpChannelEOFException | SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
} catch (IOException e) {
Expand All @@ -480,10 +495,9 @@ public synchronized int read() throws IOException {
if (byteRead >= 0) {
pos++;
nextReadPos++;
}

if (byteRead >= 0) {
incrementBytesRead(1);
} else {
streamReadResultNegative();
}
return byteRead;
}
Expand All @@ -509,6 +523,18 @@ private void onReadFailure(IOException ioe, boolean forceAbort) {
closeStream("failure recovery", forceAbort, false);
}

/**
* the read() call returned -1.
* this means "the connection has gone past the end of the object" or
* the stream has broken for some reason.
* so close stream (without an abort).
*/
private void streamReadResultNegative() {
if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) {
closeStream("wrappedStream.read() returned -1", false, false);
}
}

/**
* {@inheritDoc}
*
Expand All @@ -534,8 +560,8 @@ public synchronized int read(byte[] buf, int off, int len)

try {
lazySeek(nextReadPos, len);
} catch (EOFException e) {
// the end of the file has moved
} catch (RangeNotSatisfiableEOFException e) {
// attempt to GET beyond the end of the object
return -1;
}

Expand All @@ -548,17 +574,19 @@ public synchronized int read(byte[] buf, int off, int len)
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (wrappedStream == null) {
if (!isObjectStreamOpen()) {
reopen("failure recovery", getPos(), 1, false);
}
try {
// read data; will block until there is data or the end of the stream is reached.
// returns 0 for "stream is open but no data yet" and -1 for "end of stream".
bytes = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (SocketTimeoutException e) {
} catch (HttpChannelEOFException | SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
} catch (EOFException e) {
LOG.debug("EOFException raised by http stream read(); downgrading to a -1 response", e);
return -1;
} catch (IOException e) {
onReadFailure(e, false);
throw e;
Expand All @@ -569,8 +597,10 @@ public synchronized int read(byte[] buf, int off, int len)
if (bytesRead > 0) {
pos += bytesRead;
nextReadPos += bytesRead;
incrementBytesRead(bytesRead);
} else {
streamReadResultNegative();
}
incrementBytesRead(bytesRead);
streamStatistics.readOperationCompleted(len, bytesRead);
return bytesRead;
}
Expand Down Expand Up @@ -818,6 +848,9 @@ public void readFully(long position, byte[] buffer, int offset, int length)
while (nread < length) {
int nbytes = read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
// no attempt is currently made to recover from stream read problems;
// a lazy seek to the offset is probably the solution.
// but it will need more qualification against failure handling
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
Expand Down Expand Up @@ -987,7 +1020,7 @@ private void validateRangeRequest(FileRange range) throws EOFException {
final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s",
range.getOffset(), range.getLength(), pathStr);
LOG.warn(errMsg);
throw new EOFException(errMsg);
throw new RangeNotSatisfiableEOFException(errMsg, null);
}
}

Expand Down Expand Up @@ -1257,8 +1290,12 @@ public boolean hasCapability(String capability) {
}
}

/**
* Is the inner object stream open?
* @return true if there is an active HTTP request to S3.
*/
@VisibleForTesting
boolean isObjectStreamOpen() {
public boolean isObjectStreamOpen() {
return wrappedStream != null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,15 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
// in this map.
policyMap.put(AWSClientIOException.class, retryAwsClientExceptions);

// Http Channel issues: treat as communication failure
policyMap.put(HttpChannelEOFException.class, connectivityFailure);

// server didn't respond.
policyMap.put(AWSNoResponseException.class, retryIdempotentCalls);

// range header is out of scope of object; retrying won't help
policyMap.put(RangeNotSatisfiableEOFException.class, fail);

// should really be handled by resubmitting to new location;
// that's beyond the scope of this retry policy
policyMap.put(AWSRedirectException.class, fail);
Expand Down Expand Up @@ -251,10 +257,7 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
policyMap.put(ConnectException.class, connectivityFailure);

// this can be a sign of an HTTP connection breaking early.
// which can be reacted to by another attempt if the request was idempotent.
// But: could also be a sign of trying to read past the EOF on a GET,
// which isn't going to be recovered from
policyMap.put(EOFException.class, retryIdempotentCalls);
policyMap.put(EOFException.class, connectivityFailure);

// object not found. 404 when not unknown bucket; 410 "gone"
policyMap.put(FileNotFoundException.class, fail);
Expand Down Expand Up @@ -300,7 +303,7 @@ public RetryAction shouldRetry(Exception exception,
if (exception instanceof SdkException) {
// update the sdk exception for the purpose of exception
// processing.
ex = S3AUtils.translateException("", "", (SdkException) exception);
ex = S3AUtils.translateException("", "/", (SdkException) exception);
}
LOG.debug("Retry probe for {} with {} retries and {} failovers,"
+ " idempotent={}, due to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,20 @@ public static IOException translateException(String operation,
*/
@SuppressWarnings("ThrowableInstanceNeverThrown")
public static IOException translateException(@Nullable String operation,
String path,
@Nullable String path,
SdkException exception) {
String message = String.format("%s%s: %s",
operation,
StringUtils.isNotEmpty(path)? (" on " + path) : "",
exception);

if (path == null || path.isEmpty()) {
// handle null path by giving it a stub value.
// not ideal/informative, but ensures that the path is never null in
// exceptions constructed.
path = "/";
}

if (!(exception instanceof AwsServiceException)) {
// exceptions raised client-side: connectivity, auth, network problems...
Exception innerCause = containsInterruptedException(exception);
Expand All @@ -196,7 +203,7 @@ public static IOException translateException(@Nullable String operation,
return ioe;
}
// network problems covered by an IOE inside the exception chain.
ioe = maybeExtractIOException(path, exception);
ioe = maybeExtractIOException(path, exception, message);
if (ioe != null) {
return ioe;
}
Expand Down Expand Up @@ -300,10 +307,13 @@ public static IOException translateException(@Nullable String operation,
break;

// out of range. This may happen if an object is overwritten with
// a shorter one while it is being read.
// a shorter one while it is being read or openFile() was invoked
// passing a FileStatus or file length less than that of the object.
// although the HTTP specification says that the response should
// include a range header specifying the actual range available,
// this isn't picked up here.
case SC_416_RANGE_NOT_SATISFIABLE:
ioe = new EOFException(message);
ioe.initCause(ase);
ioe = new RangeNotSatisfiableEOFException(message, ase);
break;

// this has surfaced as a "no response from server" message.
Expand Down Expand Up @@ -673,7 +683,7 @@ public static <InstanceT> InstanceT getInstanceFromReflection(String className,
if (targetException instanceof IOException) {
throw (IOException) targetException;
} else if (targetException instanceof SdkException) {
throw translateException("Instantiate " + className, "", (SdkException) targetException);
throw translateException("Instantiate " + className, "/", (SdkException) targetException);
} else {
// supported constructor or factory method found, but the call failed
throw instantiationException(uri, className, configKey, targetException);
Expand Down
Loading