Skip to content

Commit

Permalink
HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch tes…
Browse files Browse the repository at this point in the history
…t runs

This is actually trickier than it seems as we will need to go deep into the
implementation of caching.

Specifically: the prefetcher knows the file length and if you open a file
shorter than that, but less than one block, the read is considered a failure
and the whole block is skipped, so read() of the nominally in-range data
returns -1.

This fix has to be considered a PoC and should be combined with the other
big PR for prefetching, apache#5832 as that is where changes should go.

Here is just test tuning and some differentiation of channel problems from
other EOFs.

Change-Id: Icdf7e2fb10ca77b6ca427eb207472fad277130d7
  • Loading branch information
steveloughran committed Jan 17, 2024
1 parent eeb657e commit 904513f
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5500,7 +5500,11 @@ public boolean hasPathCapability(final Path path, final String capability)
case CommonPathCapabilities.ETAGS_AVAILABLE:
return true;

/*
// Is prefetching enabled?
case PREFETCH_ENABLED_KEY:
return prefetchEnabled;

/*
* Marker policy capabilities are handed off.
*/
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
Expand Down Expand Up @@ -287,7 +288,9 @@ private InternalConstants() {
STORE_CAPABILITY_S3_EXPRESS_STORAGE,
FS_S3A_CREATE_PERFORMANCE_ENABLED,
DIRECTORY_OPERATIONS_PURGE_UPLOADS,
ENABLE_MULTI_DELETE));
ENABLE_MULTI_DELETE,
PREFETCH_ENABLED_KEY
));

/**
* AWS V4 Auth Scheme to use when creating signers: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,18 @@ public void seek(long pos) throws IOException {
public int read() throws IOException {
throwIfClosed();

if (remoteObject.size() == 0
|| nextReadPos >= remoteObject.size()) {
if (remoteObject.size() == 0) {
LOG.debug("Rejecting read on empty file");
return -1;
}

if (nextReadPos >= remoteObject.size()) {
LOG.debug("Rejecting read past EOF");
return -1;
}

if (!ensureCurrentBuffer()) {
LOG.debug("Empty buffer in cache");
return -1;
}

Expand Down Expand Up @@ -313,12 +319,18 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
return 0;
}

if (remoteObject.size() == 0
|| nextReadPos >= remoteObject.size()) {
if (remoteObject.size() == 0) {
LOG.debug("Rejecting read on empty file");
return -1;
}

if (nextReadPos >= remoteObject.size()) {
LOG.debug("Rejecting read past EOF");
return -1;
}

if (!ensureCurrentBuffer()) {
LOG.debug("Empty buffer in cache");
return -1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.HttpChannelEOFException;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

Expand Down Expand Up @@ -115,11 +115,12 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
STREAM_READ_REMOTE_BLOCK_READ, () -> {
try {
this.readOneBlock(buffer, offset, size);
} catch (HttpChannelEOFException e) {
this.remoteObject.getStatistics().readException();
throw e;
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (SocketTimeoutException e) {
throw e;
} catch (IOException e) {
this.remoteObject.getStatistics().readException();
throw e;
Expand Down Expand Up @@ -162,7 +163,7 @@ private void readOneBlock(ByteBuffer buffer, long offset, int size)
String message = String.format(
"Unexpected end of stream: buffer[%d], readSize = %d, numRemainingBytes = %d",
buffer.capacity(), readSize, numRemainingBytes);
throw new EOFException(message);
throw new HttpChannelEOFException(remoteObject.getPath(), message, null);
}

if (numBytes > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.statistics.IOStatistics;

import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY;
Expand All @@ -52,6 +54,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
Expand All @@ -60,10 +63,12 @@
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
Expand All @@ -84,6 +89,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {

private int fileLength;

/**
* Is prefetching enabled?
*/
private boolean prefetching;

public ITestS3AOpenCost() {
super(true);
}
Expand Down Expand Up @@ -111,6 +121,7 @@ public void setup() throws Exception {
writeTextFile(fs, testFile, TEXT, true);
testFileStatus = fs.getFileStatus(testFile);
fileLength = (int)testFileStatus.getLen();
prefetching = prefetching();
}

/**
Expand Down Expand Up @@ -239,7 +250,10 @@ public void testOpenFileLongerLengthReadFully() throws Throwable {
try (FSDataInputStream in = openFile(longLen,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
byte[] out = new byte[(int) (longLen)];
intercept(EOFException.class, () -> in.readFully(0, out));
intercept(EOFException.class, () -> {
in.readFully(0, out);
return in;
});
in.seek(longLen - 1);
assertEquals("read past real EOF on " + in, -1, in.read());
return in.toString();
Expand All @@ -248,7 +262,7 @@ public void testOpenFileLongerLengthReadFully() throws Throwable {
// two GET calls were made, one for readFully,
// the second on the read() past the EOF
// the operation has got as far as S3
with(STREAM_READ_OPENED, 1 + 1));
probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));

// now on a new stream, try a full read from after the EOF
verifyMetrics(() -> {
Expand Down Expand Up @@ -293,15 +307,17 @@ private FSDataInputStream openFile(final long longLen, String policy)
public void testReadPastEOF() throws Throwable {

// set a length past the actual file length
describe("read() up to the end of the real file");
final int extra = 10;
int longLen = fileLength + extra;
try (FSDataInputStream in = openFile(longLen,
FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
for (int i = 0; i < fileLength; i++) {
Assertions.assertThat(in.read())
.describedAs("read() at %d", i)
.describedAs("read() at %d from stream %s", i, in)
.isEqualTo(TEXT.charAt(i));
}
LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
}

// now open and read after the EOF; this is
Expand All @@ -323,10 +339,11 @@ public void testReadPastEOF() throws Throwable {
.describedAs("read() at %d", p)
.isEqualTo(-1);
}
LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
return in.toString();
}
},
with(Statistic.ACTION_HTTP_GET_REQUEST, extra));
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
}

/**
Expand All @@ -353,10 +370,11 @@ public void testPositionedReadableReadFullyPastEOF() throws Throwable {
return in;
});
assertS3StreamClosed(in);
return "readFully past EOF";
return "readFully past EOF with statistics"
+ ioStatisticsToPrettyString(in.getIOStatistics());
}
},
with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
}

/**
Expand All @@ -370,7 +388,6 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
int longLen = fileLength + extra;

describe("PositionedReadable.read() past the end of the file");

verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
Expand All @@ -388,10 +405,10 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
// stream is closed as part of this failure
assertS3StreamClosed(in);

return "PositionedReadable.read()) past EOF";
return "PositionedReadable.read()) past EOF with " + in;
}
},
with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
}

/**
Expand All @@ -405,7 +422,8 @@ public void testVectorReadPastEOF() throws Throwable {
final int extra = 10;
int longLen = fileLength + extra;

describe("Vector read past the end of the file");
describe("Vector read past the end of the file, expecting an EOFException");

verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
Expand All @@ -420,31 +438,47 @@ public void testVectorReadPastEOF() throws Throwable {
TimeUnit.SECONDS,
range.getData());
assertS3StreamClosed(in);
return "vector read past EOF";
return "vector read past EOF with " + in;
}
},
with(Statistic.ACTION_HTTP_GET_REQUEST, 1));
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
}

/**
* Probe the FS for supporting prefetching.
* @return true if the fs has prefetching enabled.
* @throws IOException IO problem.
*/
private boolean prefetching() throws IOException {
return getFileSystem().hasPathCapability(new Path("/"),
PREFETCH_ENABLED_KEY);
}

/**
* Assert that the inner S3 Stream is closed.
* @param in input stream
*/
private static void assertS3StreamClosed(final FSDataInputStream in) {
S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is open")
.isFalse();
final InputStream wrapped = in.getWrappedStream();
if (wrapped instanceof S3AInputStream) {
S3AInputStream s3ain = (S3AInputStream) wrapped;
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is open")
.isFalse();
}
}

/**
* Assert that the inner S3 Stream is open.
* @param in input stream
*/
private static void assertS3StreamOpen(final FSDataInputStream in) {
S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is closed")
.isTrue();
final InputStream wrapped = in.getWrappedStream();
if (wrapped instanceof S3AInputStream) {
S3AInputStream s3ain = (S3AInputStream) wrapped;
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is closed")
.isTrue();
}
}
}

0 comments on commit 904513f

Please sign in to comment.