diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index cef387e2a33300..6f2d3d73e668ac 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -5506,7 +5506,11 @@ public boolean hasPathCapability(final Path path, final String capability)
     case CommonPathCapabilities.ETAGS_AVAILABLE:
       return true;
 
-       /*
+    // Is prefetching enabled?
+    case PREFETCH_ENABLED_KEY:
+      return prefetchEnabled;
+
+    /*
      * Marker policy capabilities are handed off.
      */
     case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 79ccde9e9646ec..32becca0710aef 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -41,6 +41,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
 import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
 import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
@@ -289,7 +290,9 @@ private InternalConstants() {
           STORE_CAPABILITY_S3_EXPRESS_STORAGE,
           FS_S3A_CREATE_PERFORMANCE_ENABLED,
           DIRECTORY_OPERATIONS_PURGE_UPLOADS,
-          ENABLE_MULTI_DELETE));
+          ENABLE_MULTI_DELETE,
+          PREFETCH_ENABLED_KEY
+          ));
 
   /**
    * AWS V4 Auth Scheme to use when creating signers: {@value}.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java
index afeb18c2db0df3..cecdc24a0f26f1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java
@@ -290,12 +290,18 @@ public void seek(long pos) throws IOException {
   public int read() throws IOException {
     throwIfClosed();
 
-    if (remoteObject.size() == 0
-        || nextReadPos >= remoteObject.size()) {
+    if (remoteObject.size() == 0) {
+      LOG.debug("Rejecting read on empty file");
+      return -1;
+    }
+
+    if (nextReadPos >= remoteObject.size()) {
+      LOG.debug("Rejecting read past EOF");
       return -1;
     }
 
     if (!ensureCurrentBuffer()) {
+      LOG.debug("Empty buffer in cache");
       return -1;
     }
 
@@ -338,12 +344,18 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
       return 0;
     }
 
-    if (remoteObject.size() == 0
-        || nextReadPos >= remoteObject.size()) {
+    if (remoteObject.size() == 0) {
+      LOG.debug("Rejecting read on empty file");
+      return -1;
+    }
+
+    if (nextReadPos >= remoteObject.size()) {
+      LOG.debug("Rejecting read past EOF");
       return -1;
     }
 
     if (!ensureCurrentBuffer()) {
+      LOG.debug("Empty buffer in cache");
       return -1;
     }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java
index 69d5ce7aee1fa8..85b2021ad3d213 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java
@@ -22,13 +22,13 @@
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.impl.prefetch.Validate;
+import org.apache.hadoop.fs.s3a.HttpChannelEOFException;
 import org.apache.hadoop.fs.s3a.Invoker;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 
@@ -117,11 +117,12 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
                 STREAM_READ_REMOTE_BLOCK_READ, () -> {
                   try {
                     this.readOneBlock(buffer, offset, size);
+                  } catch (HttpChannelEOFException e) {
+                    this.remoteObject.getStatistics().readException();
+                    throw e;
                   } catch (EOFException e) {
                     // the base implementation swallows EOFs.
                     return -1;
-                  } catch (SocketTimeoutException e) {
-                    throw e;
                   } catch (IOException e) {
                     this.remoteObject.getStatistics().readException();
                     throw e;
@@ -168,7 +169,7 @@ private void readOneBlock(ByteBuffer buffer, long offset, int size)
           String message = String.format(
               "Unexpected end of stream: buffer[%d], readSize = %d, numRemainingBytes = %d",
               buffer.capacity(), readSize, numRemainingBytes);
-          throw new EOFException(message);
+          throw new HttpChannelEOFException(remoteObject.getPath(), message, null);
         }
 
         if (numBytes > 0) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index 63b25f9c8874b5..33ba9d1b7e1d9d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -20,6 +20,7 @@
 
 
 import java.io.EOFException;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -40,6 +41,7 @@
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 
 import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY;
@@ -52,6 +54,7 @@
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
 import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
@@ -60,10 +63,12 @@
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
 import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
+import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@@ -84,6 +89,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
 
   private int fileLength;
 
+  /**
+   * Is prefetching enabled?
+   */
+  private boolean prefetching;
+
   public ITestS3AOpenCost() {
     super(true);
   }
@@ -111,6 +121,7 @@ public void setup() throws Exception {
     writeTextFile(fs, testFile, TEXT, true);
     testFileStatus = fs.getFileStatus(testFile);
     fileLength = (int)testFileStatus.getLen();
+    prefetching = prefetching();
   }
 
   /**
@@ -239,7 +250,10 @@ public void testOpenFileLongerLengthReadFully() throws Throwable {
       try (FSDataInputStream in = openFile(longLen,
           FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
         byte[] out = new byte[(int) (longLen)];
-        intercept(EOFException.class, () -> in.readFully(0, out));
+        intercept(EOFException.class, () -> {
+          in.readFully(0, out);
+          return in;
+        });
         in.seek(longLen - 1);
         assertEquals("read past real EOF on " + in, -1, in.read());
         return in.toString();
@@ -248,7 +262,7 @@ public void testOpenFileLongerLengthReadFully() throws Throwable {
         // two GET calls were made, one for readFully,
         // the second on the read() past the EOF
         // the operation has got as far as S3
-        with(STREAM_READ_OPENED, 1 + 1));
+        probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));
 
     // now on a new stream, try a full read from after the EOF
     verifyMetrics(() -> {
@@ -293,15 +307,17 @@ private FSDataInputStream openFile(final long longLen, String policy)
   public void testReadPastEOF() throws Throwable {
 
     // set a length past the actual file length
+    describe("read() up to the end of the real file");
     final int extra = 10;
     int longLen = fileLength + extra;
     try (FSDataInputStream in = openFile(longLen,
         FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
       for (int i = 0; i < fileLength; i++) {
         Assertions.assertThat(in.read())
-            .describedAs("read() at %d", i)
+            .describedAs("read() at %d from stream %s", i, in)
             .isEqualTo(TEXT.charAt(i));
       }
+      LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
     }
 
     // now open and read after the EOF; this is
@@ -323,10 +339,11 @@ public void testReadPastEOF() throws Throwable {
               .describedAs("read() at %d", p)
               .isEqualTo(-1);
         }
+        LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
         return in.toString();
       }
     },
-        with(Statistic.ACTION_HTTP_GET_REQUEST, extra));
+        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
   }
 
   /**
@@ -353,10 +370,11 @@ public void testPositionedReadableReadFullyPastEOF() throws Throwable {
           return in;
         });
         assertS3StreamClosed(in);
-        return "readFully past EOF";
+        return "readFully past EOF with statistics"
+            + ioStatisticsToPrettyString(in.getIOStatistics());
       }
     },
-        with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
   }
 
   /**
@@ -370,7 +388,6 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
     int longLen = fileLength + extra;
 
     describe("PositionedReadable.read() past the end of the file");
-
     verifyMetrics(() -> {
       try (FSDataInputStream in =
                openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
@@ -388,10 +405,10 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
         // stream is closed as part of this failure
         assertS3StreamClosed(in);
 
-        return "PositionedReadable.read()) past EOF";
+        return "PositionedReadable.read()) past EOF with " + in;
       }
     },
-        with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
   }
 
   /**
@@ -405,7 +422,8 @@ public void testVectorReadPastEOF() throws Throwable {
     final int extra = 10;
     int longLen = fileLength + extra;
 
-    describe("Vector read past the end of the file");
+    describe("Vector read past the end of the file, expecting an EOFException");
+
     verifyMetrics(() -> {
       try (FSDataInputStream in =
                openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
@@ -420,10 +438,20 @@ public void testVectorReadPastEOF() throws Throwable {
             TimeUnit.SECONDS,
             range.getData());
         assertS3StreamClosed(in);
-        return "vector read past EOF";
+        return "vector read past EOF with " + in;
       }
     },
-        with(Statistic.ACTION_HTTP_GET_REQUEST, 1));
+        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
+  }
+
+  /**
+   * Probe the FS for supporting prefetching.
+   * @return true if the fs has prefetching enabled.
+   * @throws IOException IO problem.
+   */
+  private boolean prefetching() throws IOException {
+    return getFileSystem().hasPathCapability(new Path("/"),
+        PREFETCH_ENABLED_KEY);
   }
 
   /**
@@ -431,10 +459,13 @@ public void testVectorReadPastEOF() throws Throwable {
    * @param in input stream
    */
   private static void assertS3StreamClosed(final FSDataInputStream in) {
-    S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
-    Assertions.assertThat(s3ain.isObjectStreamOpen())
-        .describedAs("stream is open")
-        .isFalse();
+    final InputStream wrapped = in.getWrappedStream();
+    if (wrapped instanceof S3AInputStream) {
+      S3AInputStream s3ain = (S3AInputStream) wrapped;
+      Assertions.assertThat(s3ain.isObjectStreamOpen())
+          .describedAs("stream is open")
+          .isFalse();
+    }
   }
 
   /**
@@ -442,9 +473,12 @@ private static void assertS3StreamClosed(final FSDataInputStream in) {
    * @param in input stream
    */
   private static void assertS3StreamOpen(final FSDataInputStream in) {
-    S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
-    Assertions.assertThat(s3ain.isObjectStreamOpen())
-        .describedAs("stream is closed")
-        .isTrue();
+    final InputStream wrapped = in.getWrappedStream();
+    if (wrapped instanceof S3AInputStream) {
+      S3AInputStream s3ain = (S3AInputStream) wrapped;
+      Assertions.assertThat(s3ain.isObjectStreamOpen())
+          .describedAs("stream is closed")
+          .isTrue();
+    }
   }
 }