diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 9a61777484a2d3..a510eaa309422a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -5475,7 +5475,11 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.ETAGS_AVAILABLE: return true; - /* + // Is prefetching enabled? + case PREFETCH_ENABLED_KEY: + return prefetchEnabled; + + /* * Marker policy capabilities are handed off. */ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP: diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 969a8bc560337a..646e232b433519 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -42,6 +42,7 @@ import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE; @@ -289,7 +290,9 @@ private InternalConstants() { STORE_CAPABILITY_S3_EXPRESS_STORAGE, FS_S3A_CREATE_PERFORMANCE_ENABLED, DIRECTORY_OPERATIONS_PURGE_UPLOADS, - ENABLE_MULTI_DELETE)); + ENABLE_MULTI_DELETE, + PREFETCH_ENABLED_KEY + )); /** * AWS V4 Auth Scheme to use when creating signers: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java index afeb18c2db0df3..cecdc24a0f26f1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java @@ -290,12 +290,18 @@ public void seek(long pos) throws IOException { public int read() throws IOException { throwIfClosed(); - if (remoteObject.size() == 0 - || nextReadPos >= remoteObject.size()) { + if (remoteObject.size() == 0) { + LOG.debug("Rejecting read on empty file"); + return -1; + } + + if (nextReadPos >= remoteObject.size()) { + LOG.debug("Rejecting read past EOF"); return -1; } if (!ensureCurrentBuffer()) { + LOG.debug("Empty buffer in cache"); return -1; } @@ -338,12 +344,18 @@ public int read(byte[] buffer, int offset, int len) throws IOException { return 0; } - if (remoteObject.size() == 0 - || nextReadPos >= remoteObject.size()) { + if (remoteObject.size() == 0) { + LOG.debug("Rejecting read on empty file"); + return -1; + } + + if (nextReadPos >= remoteObject.size()) { + LOG.debug("Rejecting read past EOF"); return -1; } if (!ensureCurrentBuffer()) { + LOG.debug("Empty buffer in cache"); return -1; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java index 69d5ce7aee1fa8..85b2021ad3d213 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java @@ -22,13 +22,13 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.impl.prefetch.Validate; +import org.apache.hadoop.fs.s3a.HttpChannelEOFException; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; @@ -117,11 +117,12 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) STREAM_READ_REMOTE_BLOCK_READ, () -> { try { this.readOneBlock(buffer, offset, size); + } catch (HttpChannelEOFException e) { + this.remoteObject.getStatistics().readException(); + throw e; } catch (EOFException e) { // the base implementation swallows EOFs. return -1; - } catch (SocketTimeoutException e) { - throw e; } catch (IOException e) { this.remoteObject.getStatistics().readException(); throw e; @@ -168,7 +169,7 @@ private void readOneBlock(ByteBuffer buffer, long offset, int size) String message = String.format( "Unexpected end of stream: buffer[%d], readSize = %d, numRemainingBytes = %d", buffer.capacity(), readSize, numRemainingBytes); - throw new EOFException(message); + throw new HttpChannelEOFException(remoteObject.getPath(), message, null); } if (numBytes > 0) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 482a963b92ab49..a9d79400c864c4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -20,6 +20,7 @@ import java.io.EOFException; +import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.statistics.IOStatistics; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;