From fb83df2cf3ae0fba483b23f0e2e898bd5645d6a7 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 17 Jan 2024 19:45:08 +0000 Subject: [PATCH] HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs * Adds EOF logic deep into the prefetching code * Tests still failing. * this has all conflicts with hadoop trunk resolved Change-Id: I9b23b01d010d8a1a680ce849d26a0aebab2389e2 --- .../fs/s3a/prefetch/S3ARemoteObject.java | 8 ++- .../s3a/prefetch/S3ARemoteObjectReader.java | 50 ++++++++++++++++--- .../fs/s3a/performance/ITestS3AOpenCost.java | 36 +++++++------ .../prefetch/ITestInMemoryInputStream.java | 8 +-- 4 files changed, 71 insertions(+), 31 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java index 5129adc1f50801..34d08d88079b1d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3AUtils; @@ -36,7 +37,6 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; -import org.apache.hadoop.fs.statistics.DurationTracker; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; @@ -161,8 +161,6 @@ public long size() { /** * Opens a section of the file for reading. - * The s3 object response is cached in {@link #s3Objects} so that - * GC does not close the input stream. * * @param offset Start offset (0 based) of the section to read. * @param size Size of the section to read. @@ -173,6 +171,7 @@ public long size() { * @throws IllegalArgumentException if offset is greater than or equal to file size. * @throws IllegalArgumentException if size is greater than the remaining bytes. */ + @Retries.OnceTranslated public ResponseInputStream openForRead(long offset, int size) throws IOException { Validate.checkNotNegative(offset, "offset"); @@ -188,8 +187,7 @@ public ResponseInputStream openForRead(long offset, int size) String operation = String.format( "%s %s at %d size %d", S3AInputStream.OPERATION_OPEN, uri, offset, size); - DurationTracker tracker = streamStatistics.initiateGetRequest(); - ResponseInputStream object = null; + ResponseInputStream object; // initiate the GET. This completes once the request returns the response headers; // the data is read later. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java index 85b2021ad3d213..08ccac0bc2e03f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.fs.s3a.HttpChannelEOFException; import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException; +import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import software.amazon.awssdk.core.ResponseInputStream; @@ -78,13 +80,14 @@ public S3ARemoteObjectReader(S3ARemoteObject remoteObject) { * @param offset the absolute offset into the underlying file where reading starts. * @param size the number of bytes to be read. * - * @return number of bytes actually read. + * @return number of bytes actually read. -1 if the file is closed. * @throws IOException if there is an error reading from the file. * * @throws IllegalArgumentException if buffer is null. * @throws IllegalArgumentException if offset is outside of the range [0, file size]. * @throws IllegalArgumentException if size is zero or negative. */ + @Retries.RetryTranslated public int read(ByteBuffer buffer, long offset, int size) throws IOException { Validate.checkNotNull(buffer, "buffer"); Validate.checkWithinRange(offset, "offset", 0, this.remoteObject.size()); @@ -103,6 +106,21 @@ public void close() { this.closed = true; } + /** + * Reads one block from S3. + *

+ * There are no retries on base EOFExceptions. + * {@link HttpChannelEOFException} will be retried. + * {@link RangeNotSatisfiableEOFException} will be downgraded to + * partial read, so data may be returned. + * @param buffer destination. + * @param offset object offset + * @param size size to retrieve. + * @return bytes read. + * @throws EOFException if this was raised. + * @throws IOException IO failure. + */ + @Retries.RetryTranslated private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) throws IOException { @@ -110,7 +128,7 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) Invoker invoker = this.remoteObject.getReadInvoker(); final String path = this.remoteObject.getPath(); - int invokerResponse = + EOFException invokerResponse = invoker.retry(String.format("read %s [%d-%d]", path, offset, size), path, true, trackDurationOfOperation(streamStatistics, @@ -118,16 +136,17 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) try { this.readOneBlock(buffer, offset, size); } catch (HttpChannelEOFException e) { + // EOF subclasses with are rethrown as errors. this.remoteObject.getStatistics().readException(); throw e; } catch (EOFException e) { // the base implementation swallows EOFs. - return -1; + return e; } catch (IOException e) { this.remoteObject.getStatistics().readException(); throw e; } - return 0; + return null; })); int numBytesRead = buffer.position(); @@ -139,13 +158,28 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) this.remoteObject.getStatistics() .readOperationCompleted(size, numBytesRead); - if (invokerResponse < 0) { - return invokerResponse; - } else { - return numBytesRead; + if (invokerResponse != null) { + if (invokerResponse instanceof RangeNotSatisfiableEOFException) { + // the range wasn't satisfiable, but some may have been read. + return numBytesRead; + } else { + throw invokerResponse; + } } + + // how much was read? + return numBytesRead; } + /** + * GET one block from S3. + * @param buffer buffer to fill up. + * @param offset offset within the object. + * @param size size to retrieve. + * @throws IOException IO failure. + * @throws HttpChannelEOFException if the channel is closed during the read. + */ + @Retries.OnceTranslated private void readOneBlock(ByteBuffer buffer, long offset, int size) throws IOException { int readSize = Math.min(size, buffer.remaining()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 5bbcb7563dbe58..5deafc78fd5bfc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -41,8 +41,8 @@ import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.Statistic; -import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.io.IOUtils; import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; @@ -154,20 +154,26 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable { new Path("gopher:///localhost/" + testFile.getName())); // no IO in open - FSDataInputStream in = verifyMetrics(() -> - fs.openFile(testFile) - .withFileStatus(st2) - .build() - .get(), - always(NO_HEAD_OR_LIST), - with(STREAM_READ_OPENED, 0)); - - // the stream gets opened during read - long readLen = verifyMetrics(() -> - readStream(in), - always(NO_HEAD_OR_LIST), - with(STREAM_READ_OPENED, 1)); - assertEquals("bytes read from file", fileLength, readLen); + FSDataInputStream in = null; + try { + in = verifyMetrics(() -> + fs.openFile(testFile) + .withFileStatus(st2) + .build() + .get(), + always(NO_HEAD_OR_LIST), + with(STREAM_READ_OPENED, 0)); + + // the stream gets opened during read + final FSDataInputStream s = in; + long readLen = verifyMetrics(() -> + readStream(s), + always(NO_HEAD_OR_LIST), + with(STREAM_READ_OPENED, 1)); + assertEquals("bytes read from file", fileLength, readLen); + } finally { + IOUtils.closeStream(in); + } } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestInMemoryInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestInMemoryInputStream.java index 82bd203e146fa4..13035c8357e6d4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestInMemoryInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestInMemoryInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a.prefetch; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ /** * Test the prefetching input stream, validates that the - * S3AInMemoryInputStream is working as expected. + * {@link S3AInMemoryInputStream} is working as expected. */ public class ITestInMemoryInputStream extends AbstractS3ACostTest { @@ -215,8 +216,9 @@ public void testStatusProbesAfterClosingStream() throws Throwable { assertEquals("Stream stats retrieved through stream before and after closing should match", inputStreamStatistics, newInputStreamStatistics); - assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10)); - + Assertions.assertThat(in.seekToNewSource(10)) + .describedAs("seekToNewSource() not supported with prefetch: %s", in) + .isFalse(); } }