diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java index 501186a25492c..88854b87c81a0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java @@ -164,6 +164,7 @@ public InputStream openForRead(long offset, int size) throws IOException { Validate.checkLessOrEqual(offset, "offset", size(), "size()"); Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset"); + streamStatistics.streamOpened(); final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey()) .withRange(offset, offset + size - 1); this.changeTracker.maybeApplyConstraint(request); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java index 0fa6e33200b7b..00d5fbc367d1b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java @@ -254,6 +254,10 @@ public void seek(long pos) throws IOException { public int read() throws IOException { this.throwIfClosed(); + if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) { + return -1; + } + if (!ensureCurrentBuffer()) { return -1; } @@ -296,6 +300,10 @@ public int read(byte[] buffer, int offset, int len) throws IOException { return 0; } + if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) { + return -1; + } + if (!ensureCurrentBuffer()) { return -1; } @@ -427,18 +435,8 @@ protected void throwIfClosed() throws IOException { } protected void throwIfInvalidSeek(long pos) throws EOFException { - long fileSize = this.s3File.size(); if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); - } else { - if (fileSize == 0 && pos == 0) { - // Do nothing. Valid combination. - return; - } - - if (pos >= fileSize) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); - } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java index c874a8c37b829..0f5834da4cc8c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java @@ -103,8 +103,7 @@ public synchronized int available() throws IOException { */ @Override public synchronized long getPos() throws IOException { - this.throwIfClosed(); - return this.inputStream.getPos(); + return this.isClosed() ? 0 : this.inputStream.getPos(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java index c2e7684cad6da..9b9461c420a96 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -71,11 +72,16 @@ public void testRequesterPaysOptionSuccess() throws Throwable { inputStream.seek(0); inputStream.readByte(); - // Verify > 1 call was made, so we're sure it is correctly configured for each request - IOStatisticAssertions - .assertThatStatisticCounter(inputStream.getIOStatistics(), - StreamStatisticNames.STREAM_READ_OPENED) - .isGreaterThan(1); + if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) { + // For S3PrefetchingInputStream, verify a call was made + IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), + StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1); + } else { + // For S3AInputStream, verify > 1 call was made, + // so we're sure it is correctly configured for each request + IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), + StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1); + } // Check list calls work without error fs.listFiles(requesterPaysPath.getParent(), false); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java index 3d7ee0882efa4..3a2d1b1b09a49 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -33,6 +34,7 @@ import java.io.IOException; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES; @@ -72,6 +74,7 @@ public void testUnbuffer() throws IOException { IOStatisticsSnapshot iostats = new IOStatisticsSnapshot(); // Open file, read half the data, and then call unbuffer try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + skipIfCannotUnbuffer(inputStream); assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); int bytesToRead = 8; readAndAssertBytesRead(inputStream, bytesToRead); @@ -138,6 +141,7 @@ public void testUnbufferStreamStatistics() throws IOException { Object streamStatsStr; try { inputStream = fs.open(dest); + skipIfCannotUnbuffer(inputStream); streamStatsStr = demandStringifyIOStatisticsSource(inputStream); LOG.info("initial stream statistics {}", streamStatsStr); @@ -192,6 +196,12 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) { return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen(); } + private void skipIfCannotUnbuffer(FSDataInputStream inputStream) { + if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) { + skip("input stream does not support unbuffer"); + } + } + /** * Read the specified number of bytes from the given * {@link FSDataInputStream} and assert that diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index 010bc1c30b634..e3c6c002bff67 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -169,11 +169,6 @@ private void testSeekHelper(S3InputStream inputStream, int bufferSize, int fileS EOFException.class, FSExceptionMessages.NEGATIVE_SEEK, () -> inputStream.seek(-1)); - - ExceptionAsserts.assertThrows( - EOFException.class, - FSExceptionMessages.CANNOT_SEEK_PAST_EOF, - () -> inputStream.seek(fileSize + 1)); } @Test