Skip to content

Commit

Permalink
HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel e…
Browse files Browse the repository at this point in the history
…xceptions

- Add a new Path IOE subclass, AwsHttpChannelException
- Method ErrorTranslation.maybeExtractChannelException() to create this
  from shaded/unshaded NoHttpResponseException, using string match to
  avoid classpath problems.
- And do this for SdkClientExceptions with OpenSSL error code WFOPENSSL0035.
  This isn't strictly the right place for this as its not an IOE we are
  remapping...
- ErrorTranslation.maybeExtractIOException() to perform this translation as
  appropriate.

S3AInputStream.reopen() code now retries, except on EOFException, which is
converted to a -1 response to the caller, as is done historically.

New tests for the translation, but not for the input stream logic.
The place for that will be TestS3AInputStreamRetry

Change-Id: If9880748ec058a7ff354c4077909b3c9f8323b75
  • Loading branch information
steveloughran committed Jan 8, 2024
1 parent 5f9932a commit 0776a2e
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

/**
* Http channel exception.
* In particular:
* - NoHttpResponseException
* - OpenSSL errors
* The http client library exceptions may be shaded/unshaded; this is the
* exception used in retry policies.
*/
public class AwsHttpChannelException extends PathIOException {

public AwsHttpChannelException(final String path,
final String error,
final Throwable cause) {
super(path, error, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ public <T> T retryUntranslated(
if (caught instanceof IOException) {
translated = (IOException) caught;
} else {
translated = S3AUtils.translateException(text, "",
translated = S3AUtils.translateException(text, "/",
(SdkException) caught);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,23 +406,34 @@ public boolean seekToNewSource(long targetPos) throws IOException {

/**
* Perform lazy seek and adjust stream to correct position for reading.
*
* If an EOF Exception is raised there are two possibilities
* <ol>
* <li>the stream is at the end of the file</li>
* <li>something went wrong with the network connection</li>
* </ol>
* This method does not attempt to distinguish; it assumes that an EOF
* exception is always "end of file".
* @param targetPos position from where data should be read
* @param len length of the content that needs to be read
* @return true if the operation did not raise an {@link EOFException}
*/
@Retries.RetryTranslated
private void lazySeek(long targetPos, long len) throws IOException {
private boolean lazySeek(long targetPos, long len) throws IOException {

Invoker invoker = context.getReadInvoker();
invoker.maybeRetry(streamStatistics.getOpenOperations() == 0,
"lazySeek", pathStr, true,
return invoker.retry("lazySeek", pathStr, true,
() -> {
//For lazy seek
seekInStream(targetPos, len);

//re-open at specific location if needed
if (wrappedStream == null) {
reopen("read from new offset", targetPos, len, false);
try {
//For lazy seek
seekInStream(targetPos, len);

//re-open at specific location if needed
if (!isObjectStreamOpen()) {
reopen("read from new offset", targetPos, len, false);
}
return true;
} catch (EOFException e) {
return false;
}
});
}
Expand All @@ -447,9 +458,7 @@ public synchronized int read() throws IOException {
return -1;
}

try {
lazySeek(nextReadPos, 1);
} catch (EOFException e) {
if (!lazySeek(nextReadPos, 1)) {
return -1;
}

Expand Down Expand Up @@ -532,10 +541,7 @@ public synchronized int read(byte[] buf, int off, int len)
return -1;
}

try {
lazySeek(nextReadPos, len);
} catch (EOFException e) {
// the end of the file has moved
if (!lazySeek(nextReadPos, len)) {
return -1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AwsHttpChannelException;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
Expand Down Expand Up @@ -209,6 +210,9 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
// in this map.
policyMap.put(AWSClientIOException.class, retryAwsClientExceptions);

// Http Channel issues: treat as communication failure
policyMap.put(AwsHttpChannelException.class, connectivityFailure);

// server didn't respond.
policyMap.put(AWSNoResponseException.class, retryIdempotentCalls);

Expand Down Expand Up @@ -253,7 +257,8 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
// this can be a sign of an HTTP connection breaking early.
// which can be reacted to by another attempt if the request was idempotent.
// But: could also be a sign of trying to read past the EOF on a GET,
// which isn't going to be recovered from
// which isn't going to be recovered from and which
// in input streams should be escalated to the caller.
policyMap.put(EOFException.class, retryIdempotentCalls);

// object not found. 404 when not unknown bucket; 410 "gone"
Expand Down Expand Up @@ -300,7 +305,7 @@ public RetryAction shouldRetry(Exception exception,
if (exception instanceof SdkException) {
// update the sdk exception for the purpose of exception
// processing.
ex = S3AUtils.translateException("", "", (SdkException) exception);
ex = S3AUtils.translateException("", "/", (SdkException) exception);
}
LOG.debug("Retry probe for {} with {} retries and {} failovers,"
+ " idempotent={}, due to {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,20 @@ public static IOException translateException(String operation,
*/
@SuppressWarnings("ThrowableInstanceNeverThrown")
public static IOException translateException(@Nullable String operation,
String path,
@Nullable String path,
SdkException exception) {
String message = String.format("%s%s: %s",
operation,
StringUtils.isNotEmpty(path)? (" on " + path) : "",
exception);

if (path == null || path.isEmpty()) {
// handle null path by giving it a stub value.
// not ideal/informative, but ensures that the path is never null in
// exceptions constructed.
path = "/";
}

if (!(exception instanceof AwsServiceException)) {
// exceptions raised client-side: connectivity, auth, network problems...
Exception innerCause = containsInterruptedException(exception);
Expand All @@ -196,7 +203,7 @@ public static IOException translateException(@Nullable String operation,
return ioe;
}
// network problems covered by an IOE inside the exception chain.
ioe = maybeExtractIOException(path, exception);
ioe = maybeExtractIOException(path, exception, message);
if (ioe != null) {
return ioe;
}
Expand Down Expand Up @@ -673,7 +680,7 @@ public static <InstanceT> InstanceT getInstanceFromReflection(String className,
if (targetException instanceof IOException) {
throw (IOException) targetException;
} else if (targetException instanceof SdkException) {
throw translateException("Instantiate " + className, "", (SdkException) targetException);
throw translateException("Instantiate " + className, "/", (SdkException) targetException);
} else {
// supported constructor or factory method found, but the call failed
throw instantiationException(uri, className, configKey, targetException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public AwsCredentials resolveCredentials() {
// if the exception contains an IOE, extract it
// so its type is the immediate cause of this new exception.
Throwable t = e;
final IOException ioe = maybeExtractIOException("IAM endpoint", e);
final IOException ioe = maybeExtractIOException("IAM endpoint", e,
"resolveCredentials()");
if (ioe != null) {
t = ioe;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@

import software.amazon.awssdk.awscore.exception.AwsServiceException;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.AwsHttpChannelException;
import org.apache.hadoop.fs.PathIOException;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;

/**
Expand All @@ -42,6 +45,22 @@
*/
public final class ErrorTranslation {

/**
* OpenSSL stream closed error: {@value}.
* See HADOOP-19027.
*/
public static final String OPENSSL_STREAM_CLOSED = "WFOPENSSL0035";

/**
* Classname of unshaded Http Client exception: {@value}.
*/
private static final String RAW_NO_HTTP_RESPONSE_EXCEPTION = "org.apache.http.NoHttpResponseException";

/**
* Classname of shaded Http Client exception: {@value}.
*/
private static final String SHADED_NO_HTTP_RESPONSE_EXCEPTION =
"software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException";
/**
* Private constructor for utility class.
*/
Expand Down Expand Up @@ -71,25 +90,51 @@ public static boolean isObjectNotFound(AwsServiceException e) {
return e.statusCode() == SC_404_NOT_FOUND && !isUnknownBucket(e);
}

/**
* Tail recursive extraction of the innermost throwable.
* @param thrown next thrown in chain.
* @param outer outermost.
* @return the last non-null throwable in the chain.
*/
private static Throwable getInnermostThrowable(Throwable thrown, Throwable outer) {
if (thrown == null) {
return outer;
}
return getInnermostThrowable(thrown.getCause(), thrown);
}

/**
* Translate an exception if it or its inner exception is an
* IOException.
* If this condition is not met, null is returned.
* This also contains the logic to extract an AWS HTTP channel exception,
* which may or may not be an IOE, depending on the underlying SSL implementation
* in use.
* If an IOException cannot be extracted, null is returned.
* @param path path of operation.
* @param thrown exception
* @param message message generated by the caller.
* @return a translated exception or null.
*/
public static IOException maybeExtractIOException(String path, Throwable thrown) {
public static IOException maybeExtractIOException(
String path,
Throwable thrown,
String message) {

if (thrown == null) {
return null;
}

// look inside
Throwable cause = thrown.getCause();
while (cause != null && cause.getCause() != null) {
cause = cause.getCause();
// walk down the chain of exceptions to find the innermost.
Throwable cause = getInnermostThrowable(thrown.getCause(), thrown);

// see if this is an http channel exception
AwsHttpChannelException channelException =
maybeExtractChannelException(path, message, cause);
if (channelException != null) {
return channelException;
}

// not a channel exception, not an IOE.
if (!(cause instanceof IOException)) {
return null;
}
Expand All @@ -102,8 +147,7 @@ public static IOException maybeExtractIOException(String path, Throwable thrown)
// unless no suitable constructor is available.
final IOException ioe = (IOException) cause;

return wrapWithInnerIOE(path, thrown, ioe);

return wrapWithInnerIOE(path, message, thrown, ioe);
}

/**
Expand All @@ -116,16 +160,20 @@ public static IOException maybeExtractIOException(String path, Throwable thrown)
* See {@code NetUtils}.
* @param <T> type of inner exception.
* @param path path of the failure.
* @param message message generated by the caller.
* @param outer outermost exception.
* @param inner inner exception.
* @return the new exception.
*/
@SuppressWarnings("unchecked")
private static <T extends IOException> IOException wrapWithInnerIOE(
String path,
String message,
Throwable outer,
T inner) {
String msg = outer.toString() + ": " + inner.getMessage();
String msg = (isNotEmpty(message) ? (message + ":"
+ " ") : "")
+ outer.toString() + ": " + inner.getMessage();
Class<? extends Throwable> clazz = inner.getClass();
try {
Constructor<? extends Throwable> ctor = clazz.getConstructor(String.class);
Expand All @@ -136,6 +184,35 @@ private static <T extends IOException> IOException wrapWithInnerIOE(
}
}

/**
* Extract an AWS HTTP channel exception if the inner exception is considered
* an HttpClient {@code NoHttpResponseException} or an OpenSSL channel exception.
* This is based on string matching, which is inelegant and brittle.
* @param path path of the failure.
* @param message message generated by the caller.
* @param thrown inner exception.
* @return the new exception.
*/
@VisibleForTesting
public static AwsHttpChannelException maybeExtractChannelException(
String path,
String message,
Throwable thrown) {
final String classname = thrown.getClass().getName();
if (thrown instanceof IOException
&& (classname.equals(RAW_NO_HTTP_RESPONSE_EXCEPTION)
|| classname.equals(SHADED_NO_HTTP_RESPONSE_EXCEPTION))) {
// shaded or unshaded http client exception class
return new AwsHttpChannelException(path, message, thrown);
}
// there's ambiguity about what exception class this is
// so rather than use its type, we look for an OpenSSL string in the message
if (thrown.getMessage().contains(OPENSSL_STREAM_CLOSED)) {
return new AwsHttpChannelException(path, message, thrown);
}
return null;
}

/**
* AWS error codes explicitly recognized and processes specially;
* kept in their own class for isolation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ public static <E extends Throwable> E verifyExceptionClass(Class<E> clazz,
.describedAs("Exception expected of class %s", clazz)
.isNotNull();
if (!(ex.getClass().equals(clazz))) {
LOG.warn("Rethrowing exception: {} as it is not an instance of {}",
ex, clazz, ex);
throw ex;
}
return (E)ex;
Expand Down
Loading

0 comments on commit 0776a2e

Please sign in to comment.