From 904513fa7e7155aa58ffc69e3fa9fd7058990983 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 17 Jan 2024 18:27:29 +0000 Subject: [PATCH] HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs This is actually trickier than it seems as we will need to go deep into the implementation of caching. Specifically: the prefetcher knows the file length and if you open a file shorter than that, but less than one block, the read is considered a failure and the whole block is skipped, so read() of the nominally in-range data returns -1. This fix has to be considered a PoC and should be combined with the other big PR for prefetching, #5832 as that is where changes should go. Here is just test tuning and some differentiation of channel problems from other EOFs. Change-Id: Icdf7e2fb10ca77b6ca427eb207472fad277130d7 --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 6 +- .../hadoop/fs/s3a/impl/InternalConstants.java | 5 +- .../fs/s3a/prefetch/S3ARemoteInputStream.java | 20 ++++- .../s3a/prefetch/S3ARemoteObjectReader.java | 9 ++- .../fs/s3a/performance/ITestS3AOpenCost.java | 74 ++++++++++++++----- 5 files changed, 84 insertions(+), 30 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index c5e6e09a835eb..a4ecdf08d1cc8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -5500,7 +5500,11 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.ETAGS_AVAILABLE: return true; - /* + // Is prefetching enabled? + case PREFETCH_ENABLED_KEY: + return prefetchEnabled; + + /* * Marker policy capabilities are handed off. */ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP: diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 8ebf8c013d10a..82150ad3e9df3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -41,6 +41,7 @@ import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE; @@ -287,7 +288,9 @@ private InternalConstants() { STORE_CAPABILITY_S3_EXPRESS_STORAGE, FS_S3A_CREATE_PERFORMANCE_ENABLED, DIRECTORY_OPERATIONS_PURGE_UPLOADS, - ENABLE_MULTI_DELETE)); + ENABLE_MULTI_DELETE, + PREFETCH_ENABLED_KEY + )); /** * AWS V4 Auth Scheme to use when creating signers: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java index 38d740bd74f94..b8e4eb52c15aa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java @@ -265,12 +265,18 @@ public void seek(long pos) throws IOException { public int read() throws IOException { throwIfClosed(); - if (remoteObject.size() == 0 - || nextReadPos >= remoteObject.size()) { + if (remoteObject.size() == 0) { + LOG.debug("Rejecting read on empty file"); + return -1; + } + + if (nextReadPos >= remoteObject.size()) { + LOG.debug("Rejecting read past EOF"); return -1; } if (!ensureCurrentBuffer()) { + LOG.debug("Empty buffer in cache"); return -1; } @@ -313,12 +319,18 @@ public int read(byte[] buffer, int offset, int len) throws IOException { return 0; } - if (remoteObject.size() == 0 - || nextReadPos >= remoteObject.size()) { + if (remoteObject.size() == 0) { + LOG.debug("Rejecting read on empty file"); + return -1; + } + + if (nextReadPos >= remoteObject.size()) { + LOG.debug("Rejecting read past EOF"); return -1; } if (!ensureCurrentBuffer()) { + LOG.debug("Empty buffer in cache"); return -1; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java index b49b2699f916b..f5bba8276fce6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java @@ -22,13 +22,13 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.impl.prefetch.Validate; +import org.apache.hadoop.fs.s3a.HttpChannelEOFException; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; @@ -115,11 +115,12 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) STREAM_READ_REMOTE_BLOCK_READ, () -> { try { this.readOneBlock(buffer, offset, size); + } catch (HttpChannelEOFException e) { + this.remoteObject.getStatistics().readException(); + throw e; } catch (EOFException e) { // the base implementation swallows EOFs. return -1; - } catch (SocketTimeoutException e) { - throw e; } catch (IOException e) { this.remoteObject.getStatistics().readException(); throw e; @@ -162,7 +163,7 @@ private void readOneBlock(ByteBuffer buffer, long offset, int size) String message = String.format( "Unexpected end of stream: buffer[%d], readSize = %d, numRemainingBytes = %d", buffer.capacity(), readSize, numRemainingBytes); - throw new EOFException(message); + throw new HttpChannelEOFException(remoteObject.getPath(), message, null); } if (numBytes > 0) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 63b25f9c8874b..33ba9d1b7e1d9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -20,6 +20,7 @@ import java.io.EOFException; +import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.statistics.IOStatistics; import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY; @@ -52,6 +54,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; @@ -60,10 +63,12 @@ import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST; +import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; @@ -84,6 +89,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { private int fileLength; + /** + * Is prefetching enabled? + */ + private boolean prefetching; + public ITestS3AOpenCost() { super(true); } @@ -111,6 +121,7 @@ public void setup() throws Exception { writeTextFile(fs, testFile, TEXT, true); testFileStatus = fs.getFileStatus(testFile); fileLength = (int)testFileStatus.getLen(); + prefetching = prefetching(); } /** @@ -239,7 +250,10 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { byte[] out = new byte[(int) (longLen)]; - intercept(EOFException.class, () -> in.readFully(0, out)); + intercept(EOFException.class, () -> { + in.readFully(0, out); + return in; + }); in.seek(longLen - 1); assertEquals("read past real EOF on " + in, -1, in.read()); return in.toString(); @@ -248,7 +262,7 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { // two GET calls were made, one for readFully, // the second on the read() past the EOF // the operation has got as far as S3 - with(STREAM_READ_OPENED, 1 + 1)); + probe(!prefetching(), STREAM_READ_OPENED, 1 + 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -293,15 +307,17 @@ private FSDataInputStream openFile(final long longLen, String policy) public void testReadPastEOF() throws Throwable { // set a length past the actual file length + describe("read() up to the end of the real file"); final int extra = 10; int longLen = fileLength + extra; try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { for (int i = 0; i < fileLength; i++) { Assertions.assertThat(in.read()) - .describedAs("read() at %d", i) + .describedAs("read() at %d from stream %s", i, in) .isEqualTo(TEXT.charAt(i)); } + LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics())); } // now open and read after the EOF; this is @@ -323,10 +339,11 @@ public void testReadPastEOF() throws Throwable { .describedAs("read() at %d", p) .isEqualTo(-1); } + LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics())); return in.toString(); } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra)); } /** @@ -353,10 +370,11 @@ public void testPositionedReadableReadFullyPastEOF() throws Throwable { return in; }); assertS3StreamClosed(in); - return "readFully past EOF"; + return "readFully past EOF with statistics" + + ioStatisticsToPrettyString(in.getIOStatistics()); } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } /** @@ -370,7 +388,6 @@ public void testPositionedReadableReadPastEOF() throws Throwable { int longLen = fileLength + extra; describe("PositionedReadable.read() past the end of the file"); - verifyMetrics(() -> { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { @@ -388,10 +405,10 @@ public void testPositionedReadableReadPastEOF() throws Throwable { // stream is closed as part of this failure assertS3StreamClosed(in); - return "PositionedReadable.read()) past EOF"; + return "PositionedReadable.read()) past EOF with " + in; } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open } /** @@ -405,7 +422,8 @@ public void testVectorReadPastEOF() throws Throwable { final int extra = 10; int longLen = fileLength + extra; - describe("Vector read past the end of the file"); + describe("Vector read past the end of the file, expecting an EOFException"); + verifyMetrics(() -> { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { @@ -420,10 +438,20 @@ public void testVectorReadPastEOF() throws Throwable { TimeUnit.SECONDS, range.getData()); assertS3StreamClosed(in); - return "vector read past EOF"; + return "vector read past EOF with " + in; } }, - with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); + probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); + } + + /** + * Probe the FS for supporting prefetching. + * @return true if the fs has prefetching enabled. + * @throws IOException IO problem. + */ + private boolean prefetching() throws IOException { + return getFileSystem().hasPathCapability(new Path("/"), + PREFETCH_ENABLED_KEY); } /** @@ -431,10 +459,13 @@ public void testVectorReadPastEOF() throws Throwable { * @param in input stream */ private static void assertS3StreamClosed(final FSDataInputStream in) { - S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); - Assertions.assertThat(s3ain.isObjectStreamOpen()) - .describedAs("stream is open") - .isFalse(); + final InputStream wrapped = in.getWrappedStream(); + if (wrapped instanceof S3AInputStream) { + S3AInputStream s3ain = (S3AInputStream) wrapped; + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is open") + .isFalse(); + } } /** @@ -442,9 +473,12 @@ private static void assertS3StreamClosed(final FSDataInputStream in) { * @param in input stream */ private static void assertS3StreamOpen(final FSDataInputStream in) { - S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); - Assertions.assertThat(s3ain.isObjectStreamOpen()) - .describedAs("stream is closed") - .isTrue(); + final InputStream wrapped = in.getWrappedStream(); + if (wrapped instanceof S3AInputStream) { + S3AInputStream s3ain = (S3AInputStream) wrapped; + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is closed") + .isTrue(); + } } }