Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs #6465

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ protected OperationCostValidator.ExpectedProbe always(
return expect(true, cost);
}

/**
* Always run a metrics operation.
* @return a probe.
*/
protected OperationCostValidator.ExpectedProbe always() {
return OperationCostValidator.always();
}

/**
* A metric diff which must hold when the fs is keeping markers.
* @param cost expected cost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
Expand All @@ -60,10 +62,12 @@
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
Expand All @@ -84,6 +88,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {

private int fileLength;

/**
* Is prefetching enabled?
*/
private boolean prefetching;

public ITestS3AOpenCost() {
super(true);
}
Expand Down Expand Up @@ -111,6 +120,7 @@ public void setup() throws Exception {
writeTextFile(fs, testFile, TEXT, true);
testFileStatus = fs.getFileStatus(testFile);
fileLength = (int)testFileStatus.getLen();
prefetching = prefetching();
}

/**
Expand Down Expand Up @@ -161,7 +171,11 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
@Test
public void testStreamIsNotChecksummed() throws Throwable {
describe("Verify that an opened stream is not checksummed");

// if prefetching is enabled, skip this test
assumeNoPrefetching();
S3AFileSystem fs = getFileSystem();

steveloughran marked this conversation as resolved.
Show resolved Hide resolved
// open the file
try (FSDataInputStream in = verifyMetrics(() ->
fs.openFile(testFile)
Expand All @@ -173,12 +187,6 @@ public void testStreamIsNotChecksummed() throws Throwable {
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0))) {

// if prefetching is enabled, skip this test
final InputStream wrapped = in.getWrappedStream();
if (!(wrapped instanceof S3AInputStream)) {
skip("Not an S3AInputStream: " + wrapped);
}

// open the stream.
in.read();
// now examine the innermost stream and make sure it doesn't have a checksum
Expand Down Expand Up @@ -239,16 +247,20 @@ public void testOpenFileLongerLengthReadFully() throws Throwable {
try (FSDataInputStream in = openFile(longLen,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
byte[] out = new byte[(int) (longLen)];
intercept(EOFException.class, () -> in.readFully(0, out));
intercept(EOFException.class, () -> {
in.readFully(0, out);
return in;
});
in.seek(longLen - 1);
assertEquals("read past real EOF on " + in, -1, in.read());
return in.toString();
}
},
always(),
// two GET calls were made, one for readFully,
// the second on the read() past the EOF
// the operation has got as far as S3
with(STREAM_READ_OPENED, 1 + 1));
probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));

// now on a new stream, try a full read from after the EOF
verifyMetrics(() -> {
Expand Down Expand Up @@ -293,15 +305,19 @@ private FSDataInputStream openFile(final long longLen, String policy)
public void testReadPastEOF() throws Throwable {

// set a length past the actual file length
describe("read() up to the end of the real file");
assumeNoPrefetching();

final int extra = 10;
int longLen = fileLength + extra;
try (FSDataInputStream in = openFile(longLen,
FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
for (int i = 0; i < fileLength; i++) {
Assertions.assertThat(in.read())
.describedAs("read() at %d", i)
.describedAs("read() at %d from stream %s", i, in)
.isEqualTo(TEXT.charAt(i));
}
LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
}

// now open and read after the EOF; this is
Expand All @@ -323,10 +339,12 @@ public void testReadPastEOF() throws Throwable {
.describedAs("read() at %d", p)
.isEqualTo(-1);
}
LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
return in.toString();
}
},
with(Statistic.ACTION_HTTP_GET_REQUEST, extra));
always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
}

/**
Expand All @@ -353,10 +371,12 @@ public void testPositionedReadableReadFullyPastEOF() throws Throwable {
return in;
});
assertS3StreamClosed(in);
return "readFully past EOF";
return "readFully past EOF with statistics"
+ ioStatisticsToPrettyString(in.getIOStatistics());
}
},
with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
}

/**
Expand All @@ -370,6 +390,7 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
int longLen = fileLength + extra;

describe("PositionedReadable.read() past the end of the file");
assumeNoPrefetching();

verifyMetrics(() -> {
try (FSDataInputStream in =
Expand All @@ -388,10 +409,11 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
// stream is closed as part of this failure
assertS3StreamClosed(in);

return "PositionedReadable.read()) past EOF";
return "PositionedReadable.read()) past EOF with " + in;
}
},
with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
}

/**
Expand All @@ -405,7 +427,8 @@ public void testVectorReadPastEOF() throws Throwable {
final int extra = 10;
int longLen = fileLength + extra;

describe("Vector read past the end of the file");
describe("Vector read past the end of the file, expecting an EOFException");

verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
Expand All @@ -420,31 +443,56 @@ public void testVectorReadPastEOF() throws Throwable {
TimeUnit.SECONDS,
range.getData());
assertS3StreamClosed(in);
return "vector read past EOF";
return "vector read past EOF with " + in;
}
},
with(Statistic.ACTION_HTTP_GET_REQUEST, 1));
always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
}

/**
* Probe the FS for supporting prefetching.
* @return true if the fs has prefetching enabled.
*/
private boolean prefetching() {
return getFileSystem().getConf().getBoolean(
PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
}

/**
* Skip the test if prefetching is enabled.
*/
private void assumeNoPrefetching(){
if (prefetching) {
skip("Prefetching is enabled");
}
}

/**
* Assert that the inner S3 Stream is closed.
* @param in input stream
*/
private static void assertS3StreamClosed(final FSDataInputStream in) {
S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is open")
.isFalse();
final InputStream wrapped = in.getWrappedStream();
if (wrapped instanceof S3AInputStream) {
S3AInputStream s3ain = (S3AInputStream) wrapped;
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is open: %s", s3ain)
.isFalse();
}
}

/**
* Assert that the inner S3 Stream is open.
* Assert that the inner S3 Stream is closed.
* @param in input stream
*/
private static void assertS3StreamOpen(final FSDataInputStream in) {
S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream();
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is closed")
.isTrue();
final InputStream wrapped = in.getWrappedStream();
if (wrapped instanceof S3AInputStream) {
S3AInputStream s3ain = (S3AInputStream) wrapped;
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is closed: %s", s3ain)
.isTrue();
}
}
}