Skip to content

Commit

Permalink
HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel e…
Browse files Browse the repository at this point in the history
…xceptions (#6425)



Differentiate from "EOF out of range/end of GET" from
"EOF channel problems" through
two different subclasses of EOFException and input streams to always
retry on http channel errors; out of range GET requests are not retried.
Currently an EOFException is always treated as a fail-fast call in read()

This allows for all existing external code catching EOFException to handle
both, but S3AInputStream to cleanly differentiate range errors (map to -1)
from channel errors (retry)

- HttpChannelEOFException is subclass of EOFException, so all code
  which catches EOFException is still happy.
  retry policy: connectivityFailure
- RangeNotSatisfiableEOFException is the subclass of EOFException
  raised on 416 GET range errors.
  retry policy: fail
- Method ErrorTranslation.maybeExtractChannelException() to create this
  from shaded/unshaded NoHttpResponseException, using string match to
  avoid classpath problems.
- And do this for SdkClientExceptions with OpenSSL error code WFOPENSSL0035.
  We believe this is the OpenSSL equivalent.
- ErrorTranslation.maybeExtractIOException() to perform this translation as
  appropriate.

S3AInputStream.reopen() code retries on EOF, except on
 RangeNotSatisfiableEOFException,
 which is converted to a -1 response to the caller
 as is done historically.

S3AInputStream knows to handle these with
 read(): HttpChannelEOFException: stream aborting close then retry
 lazySeek(): Map RangeNotSatisfiableEOFException to -1, but do not map
  any other EOFException class raised.

This means that
* out of range reads map to -1
* channel problems in reopen are retried
* channel problems in read() abort the failed http connection so it
  isn't recycled

Tests for this using/abusing mocking.

Testing through actually raising 416 exceptions and verifying that
readFully(), char read() and vector reads are all good.

There is no attempt to recover within a readFully(); there's
a boolean constant switch to turn this on, but if anyone does
it a test will spin forever as the inner PositionedReadable.read(position, buffer, len)
downgrades all EOF exceptions to -1.
A new method would need to be added which controls whether to downgrade/rethrow
exceptions.

What does that mean? Possibly reduced resilience to non-retried failures
on the inner stream, even though more channel exceptions are retried on.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Jan 16, 2024
1 parent 6652922 commit 36198b5
Show file tree
Hide file tree
Showing 15 changed files with 912 additions and 147 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.EOFException;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Http channel exception; subclass of EOFException.
* In particular:
* - NoHttpResponseException
* - OpenSSL errors
* The http client library exceptions may be shaded/unshaded; this is the
* exception used in retry policies.
*/
@InterfaceAudience.Private
public class HttpChannelEOFException extends EOFException {

public HttpChannelEOFException(final String path,
final String error,
final Throwable cause) {
super(error);
initCause(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ public <T> T retryUntranslated(
if (caught instanceof IOException) {
translated = (IOException) caught;
} else {
translated = S3AUtils.translateException(text, "",
translated = S3AUtils.translateException(text, "/",
(SdkException) caught);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.EOFException;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Status code 416, range not satisfiable.
* Subclass of {@link EOFException} so that any code which expects that to
* be the outcome of a 416 failure will continue to work.
*/
@InterfaceAudience.Private
public class RangeNotSatisfiableEOFException extends EOFException {

public RangeNotSatisfiableEOFException(
String operation,
Exception cause) {
super(operation);
initCause(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";

/**
* Switch for behavior on when wrappedStream.read()
* returns -1 or raises an EOF; the original semantics
* are that the stream is kept open.
* Value {@value}.
*/
private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true;

/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
Expand Down Expand Up @@ -333,7 +341,7 @@ private void seekQuietly(long positiveTargetPos) {
@Retries.OnceTranslated
private void seekInStream(long targetPos, long length) throws IOException {
checkNotClosed();
if (wrappedStream == null) {
if (!isObjectStreamOpen()) {
return;
}
// compute how much more to skip
Expand Down Expand Up @@ -406,22 +414,29 @@ public boolean seekToNewSource(long targetPos) throws IOException {

/**
* Perform lazy seek and adjust stream to correct position for reading.
*
* If an EOF Exception is raised there are two possibilities
* <ol>
* <li>the stream is at the end of the file</li>
* <li>something went wrong with the network connection</li>
* </ol>
* This method does not attempt to distinguish; it assumes that an EOF
* exception is always "end of file".
* @param targetPos position from where data should be read
* @param len length of the content that needs to be read
* @throws RangeNotSatisfiableEOFException GET is out of range
* @throws IOException anything else.
*/
@Retries.RetryTranslated
private void lazySeek(long targetPos, long len) throws IOException {

Invoker invoker = context.getReadInvoker();
invoker.maybeRetry(streamStatistics.getOpenOperations() == 0,
"lazySeek", pathStr, true,
invoker.retry("lazySeek to " + targetPos, pathStr, true,
() -> {
//For lazy seek
seekInStream(targetPos, len);

//re-open at specific location if needed
if (wrappedStream == null) {
if (!isObjectStreamOpen()) {
reopen("read from new offset", targetPos, len, false);
}
});
Expand Down Expand Up @@ -449,7 +464,9 @@ public synchronized int read() throws IOException {

try {
lazySeek(nextReadPos, 1);
} catch (EOFException e) {
} catch (RangeNotSatisfiableEOFException e) {
// attempt to GET beyond the end of the object
LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos);
return -1;
}

Expand All @@ -460,14 +477,12 @@ public synchronized int read() throws IOException {
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (wrappedStream == null) {
if (!isObjectStreamOpen()) {
reopen("failure recovery", getPos(), 1, false);
}
try {
b = wrappedStream.read();
} catch (EOFException e) {
return -1;
} catch (SocketTimeoutException e) {
} catch (HttpChannelEOFException | SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
} catch (IOException e) {
Expand All @@ -480,10 +495,9 @@ public synchronized int read() throws IOException {
if (byteRead >= 0) {
pos++;
nextReadPos++;
}

if (byteRead >= 0) {
incrementBytesRead(1);
} else {
streamReadResultNegative();
}
return byteRead;
}
Expand All @@ -509,6 +523,18 @@ private void onReadFailure(IOException ioe, boolean forceAbort) {
closeStream("failure recovery", forceAbort, false);
}

/**
* the read() call returned -1.
* this means "the connection has gone past the end of the object" or
* the stream has broken for some reason.
* so close stream (without an abort).
*/
private void streamReadResultNegative() {
if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) {
closeStream("wrappedStream.read() returned -1", false, false);
}
}

/**
* {@inheritDoc}
*
Expand All @@ -534,8 +560,8 @@ public synchronized int read(byte[] buf, int off, int len)

try {
lazySeek(nextReadPos, len);
} catch (EOFException e) {
// the end of the file has moved
} catch (RangeNotSatisfiableEOFException e) {
// attempt to GET beyond the end of the object
return -1;
}

Expand All @@ -548,17 +574,19 @@ public synchronized int read(byte[] buf, int off, int len)
// When exception happens before re-setting wrappedStream in "reopen" called
// by onReadFailure, then wrappedStream will be null. But the **retry** may
// re-execute this block and cause NPE if we don't check wrappedStream
if (wrappedStream == null) {
if (!isObjectStreamOpen()) {
reopen("failure recovery", getPos(), 1, false);
}
try {
// read data; will block until there is data or the end of the stream is reached.
// returns 0 for "stream is open but no data yet" and -1 for "end of stream".
bytes = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (SocketTimeoutException e) {
} catch (HttpChannelEOFException | SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
} catch (EOFException e) {
LOG.debug("EOFException raised by http stream read(); downgrading to a -1 response", e);
return -1;
} catch (IOException e) {
onReadFailure(e, false);
throw e;
Expand All @@ -569,8 +597,10 @@ public synchronized int read(byte[] buf, int off, int len)
if (bytesRead > 0) {
pos += bytesRead;
nextReadPos += bytesRead;
incrementBytesRead(bytesRead);
} else {
streamReadResultNegative();
}
incrementBytesRead(bytesRead);
streamStatistics.readOperationCompleted(len, bytesRead);
return bytesRead;
}
Expand Down Expand Up @@ -818,6 +848,9 @@ public void readFully(long position, byte[] buffer, int offset, int length)
while (nread < length) {
int nbytes = read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
// no attempt is currently made to recover from stream read problems;
// a lazy seek to the offset is probably the solution.
// but it will need more qualification against failure handling
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
Expand Down Expand Up @@ -987,7 +1020,7 @@ private void validateRangeRequest(FileRange range) throws EOFException {
final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s",
range.getOffset(), range.getLength(), pathStr);
LOG.warn(errMsg);
throw new EOFException(errMsg);
throw new RangeNotSatisfiableEOFException(errMsg, null);
}
}

Expand Down Expand Up @@ -1257,8 +1290,12 @@ public boolean hasCapability(String capability) {
}
}

/**
* Is the inner object stream open?
* @return true if there is an active HTTP request to S3.
*/
@VisibleForTesting
boolean isObjectStreamOpen() {
public boolean isObjectStreamOpen() {
return wrappedStream != null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,15 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
// in this map.
policyMap.put(AWSClientIOException.class, retryAwsClientExceptions);

// Http Channel issues: treat as communication failure
policyMap.put(HttpChannelEOFException.class, connectivityFailure);

// server didn't respond.
policyMap.put(AWSNoResponseException.class, retryIdempotentCalls);

// range header is out of scope of object; retrying won't help
policyMap.put(RangeNotSatisfiableEOFException.class, fail);

// should really be handled by resubmitting to new location;
// that's beyond the scope of this retry policy
policyMap.put(AWSRedirectException.class, fail);
Expand Down Expand Up @@ -251,10 +257,7 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
policyMap.put(ConnectException.class, connectivityFailure);

// this can be a sign of an HTTP connection breaking early.
// which can be reacted to by another attempt if the request was idempotent.
// But: could also be a sign of trying to read past the EOF on a GET,
// which isn't going to be recovered from
policyMap.put(EOFException.class, retryIdempotentCalls);
policyMap.put(EOFException.class, connectivityFailure);

// object not found. 404 when not unknown bucket; 410 "gone"
policyMap.put(FileNotFoundException.class, fail);
Expand Down Expand Up @@ -300,7 +303,7 @@ public RetryAction shouldRetry(Exception exception,
if (exception instanceof SdkException) {
// update the sdk exception for the purpose of exception
// processing.
ex = S3AUtils.translateException("", "", (SdkException) exception);
ex = S3AUtils.translateException("", "/", (SdkException) exception);
}
LOG.debug("Retry probe for {} with {} retries and {} failovers,"
+ " idempotent={}, due to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,20 @@ public static IOException translateException(String operation,
*/
@SuppressWarnings("ThrowableInstanceNeverThrown")
public static IOException translateException(@Nullable String operation,
String path,
@Nullable String path,
SdkException exception) {
String message = String.format("%s%s: %s",
operation,
StringUtils.isNotEmpty(path)? (" on " + path) : "",
exception);

if (path == null || path.isEmpty()) {
// handle null path by giving it a stub value.
// not ideal/informative, but ensures that the path is never null in
// exceptions constructed.
path = "/";
}

if (!(exception instanceof AwsServiceException)) {
// exceptions raised client-side: connectivity, auth, network problems...
Exception innerCause = containsInterruptedException(exception);
Expand All @@ -196,7 +203,7 @@ public static IOException translateException(@Nullable String operation,
return ioe;
}
// network problems covered by an IOE inside the exception chain.
ioe = maybeExtractIOException(path, exception);
ioe = maybeExtractIOException(path, exception, message);
if (ioe != null) {
return ioe;
}
Expand Down Expand Up @@ -300,10 +307,13 @@ public static IOException translateException(@Nullable String operation,
break;

// out of range. This may happen if an object is overwritten with
// a shorter one while it is being read.
// a shorter one while it is being read or openFile() was invoked
// passing a FileStatus or file length less than that of the object.
// although the HTTP specification says that the response should
// include a range header specifying the actual range available,
// this isn't picked up here.
case SC_416_RANGE_NOT_SATISFIABLE:
ioe = new EOFException(message);
ioe.initCause(ase);
ioe = new RangeNotSatisfiableEOFException(message, ase);
break;

// this has surfaced as a "no response from server" message.
Expand Down Expand Up @@ -673,7 +683,7 @@ public static <InstanceT> InstanceT getInstanceFromReflection(String className,
if (targetException instanceof IOException) {
throw (IOException) targetException;
} else if (targetException instanceof SdkException) {
throw translateException("Instantiate " + className, "", (SdkException) targetException);
throw translateException("Instantiate " + className, "/", (SdkException) targetException);
} else {
// supported constructor or factory method found, but the call failed
throw instantiationException(uri, className, configKey, targetException);
Expand Down
Loading

0 comments on commit 36198b5

Please sign in to comment.