Skip to content

Commit

Permalink
HADOOP-18175. fix test failures with prefetching s3a input stream (#4212
Browse files Browse the repository at this point in the history
)


Contributed by Monthon Klongklaew
  • Loading branch information
monthonk authored May 4, 2022
1 parent f4d016f commit f38bbe2
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
Validate.checkLessOrEqual(offset, "offset", size(), "size()");
Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");

streamStatistics.streamOpened();
final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey())
.withRange(offset, offset + size - 1);
this.changeTracker.maybeApplyConstraint(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ public void seek(long pos) throws IOException {
public int read() throws IOException {
this.throwIfClosed();

if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
return -1;
}

if (!ensureCurrentBuffer()) {
return -1;
}
Expand Down Expand Up @@ -296,6 +300,10 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
return 0;
}

if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
return -1;
}

if (!ensureCurrentBuffer()) {
return -1;
}
Expand Down Expand Up @@ -427,18 +435,8 @@ protected void throwIfClosed() throws IOException {
}

protected void throwIfInvalidSeek(long pos) throws EOFException {
long fileSize = this.s3File.size();
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
} else {
if (fileSize == 0 && pos == 0) {
// Do nothing. Valid combination.
return;
}

if (pos >= fileSize) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public synchronized int available() throws IOException {
*/
@Override
public synchronized long getPos() throws IOException {
this.throwIfClosed();
return this.inputStream.getPos();
return this.isClosed() ? 0 : this.inputStream.getPos();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.statistics.StreamStatisticNames;

import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

Expand Down Expand Up @@ -71,11 +72,16 @@ public void testRequesterPaysOptionSuccess() throws Throwable {
inputStream.seek(0);
inputStream.readByte();

// Verify > 1 call was made, so we're sure it is correctly configured for each request
IOStatisticAssertions
.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED)
.isGreaterThan(1);
if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) {
// For S3PrefetchingInputStream, verify a call was made
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1);
} else {
// For S3AInputStream, verify > 1 call was made,
// so we're sure it is correctly configured for each request
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1);
}

// Check list calls work without error
fs.listFiles(requesterPaysPath.getParent(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
Expand All @@ -33,6 +34,7 @@

import java.io.IOException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES;
Expand Down Expand Up @@ -72,6 +74,7 @@ public void testUnbuffer() throws IOException {
IOStatisticsSnapshot iostats = new IOStatisticsSnapshot();
// Open file, read half the data, and then call unbuffer
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
skipIfCannotUnbuffer(inputStream);
assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
int bytesToRead = 8;
readAndAssertBytesRead(inputStream, bytesToRead);
Expand Down Expand Up @@ -138,6 +141,7 @@ public void testUnbufferStreamStatistics() throws IOException {
Object streamStatsStr;
try {
inputStream = fs.open(dest);
skipIfCannotUnbuffer(inputStream);
streamStatsStr = demandStringifyIOStatisticsSource(inputStream);

LOG.info("initial stream statistics {}", streamStatsStr);
Expand Down Expand Up @@ -192,6 +196,12 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
}

private void skipIfCannotUnbuffer(FSDataInputStream inputStream) {
if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) {
skip("input stream does not support unbuffer");
}
}

/**
* Read the specified number of bytes from the given
* {@link FSDataInputStream} and assert that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ private void testSeekHelper(S3InputStream inputStream, int bufferSize, int fileS
EOFException.class,
FSExceptionMessages.NEGATIVE_SEEK,
() -> inputStream.seek(-1));

ExceptionAsserts.assertThrows(
EOFException.class,
FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
() -> inputStream.seek(fileSize + 1));
}

@Test
Expand Down

0 comments on commit f38bbe2

Please sign in to comment.