Skip to content

Commit

Permalink
HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch tes…
Browse files Browse the repository at this point in the history
…t runs

* Adds EOF logic deep into the prefetching code
* Tests still failing.
* this has all conflicts with hadoop trunk resolved

Change-Id: I9b23b01d010d8a1a680ce849d26a0aebab2389e2
  • Loading branch information
steveloughran committed May 31, 2024
1 parent 5aa8231 commit d39deb8
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@

import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;

import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;

Expand Down Expand Up @@ -161,8 +161,6 @@ public long size() {

/**
* Opens a section of the file for reading.
* The s3 object response is cached in {@link #s3Objects} so that
* GC does not close the input stream.
*
* @param offset Start offset (0 based) of the section to read.
* @param size Size of the section to read.
Expand All @@ -173,6 +171,7 @@ public long size() {
* @throws IllegalArgumentException if offset is greater than or equal to file size.
* @throws IllegalArgumentException if size is greater than the remaining bytes.
*/
@Retries.OnceTranslated
public ResponseInputStream<GetObjectResponse> openForRead(long offset, int size)
throws IOException {
Validate.checkNotNegative(offset, "offset");
Expand All @@ -188,8 +187,7 @@ public ResponseInputStream<GetObjectResponse> openForRead(long offset, int size)

String operation = String.format(
"%s %s at %d size %d", S3AInputStream.OPERATION_OPEN, uri, offset, size);
DurationTracker tracker = streamStatistics.initiateGetRequest();
ResponseInputStream<GetObjectResponse> object = null;
ResponseInputStream<GetObjectResponse> object;

// initiate the GET. This completes once the request returns the response headers;
// the data is read later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.HttpChannelEOFException;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

import software.amazon.awssdk.core.ResponseInputStream;
Expand Down Expand Up @@ -78,13 +80,14 @@ public S3ARemoteObjectReader(S3ARemoteObject remoteObject) {
* @param offset the absolute offset into the underlying file where reading starts.
* @param size the number of bytes to be read.
*
* @return number of bytes actually read.
* @return number of bytes actually read. -1 if the file is closed.
* @throws IOException if there is an error reading from the file.
*
* @throws IllegalArgumentException if buffer is null.
* @throws IllegalArgumentException if offset is outside of the range [0, file size].
* @throws IllegalArgumentException if size is zero or negative.
*/
@Retries.RetryTranslated
public int read(ByteBuffer buffer, long offset, int size) throws IOException {
Validate.checkNotNull(buffer, "buffer");
Validate.checkWithinRange(offset, "offset", 0, this.remoteObject.size());
Expand All @@ -103,31 +106,47 @@ public void close() {
this.closed = true;
}

/**
* Reads one block from S3.
* <p>
* There are no retries on base EOFExceptions.
* {@link HttpChannelEOFException} will be retried.
* {@link RangeNotSatisfiableEOFException} will be downgraded to
* partial read, so data may be returned.
* @param buffer destination.
* @param offset object offset
* @param size size to retrieve.
* @return bytes read.
* @throws EOFException if this was raised.
* @throws IOException IO failure.
*/
@Retries.RetryTranslated
private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
throws IOException {

this.streamStatistics.readOperationStarted(offset, size);
Invoker invoker = this.remoteObject.getReadInvoker();

final String path = this.remoteObject.getPath();
int invokerResponse =
EOFException invokerResponse =
invoker.retry(String.format("read %s [%d-%d]", path, offset, size),
path, true,
trackDurationOfOperation(streamStatistics,
STREAM_READ_REMOTE_BLOCK_READ, () -> {
try {
this.readOneBlock(buffer, offset, size);
} catch (HttpChannelEOFException e) {
// EOF subclasses with are rethrown as errors.
this.remoteObject.getStatistics().readException();
throw e;
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
return e;
} catch (IOException e) {
this.remoteObject.getStatistics().readException();
throw e;
}
return 0;
return null;
}));

int numBytesRead = buffer.position();
Expand All @@ -139,13 +158,28 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
this.remoteObject.getStatistics()
.readOperationCompleted(size, numBytesRead);

if (invokerResponse < 0) {
return invokerResponse;
} else {
return numBytesRead;
if (invokerResponse != null) {
if (invokerResponse instanceof RangeNotSatisfiableEOFException) {
// the range wasn't satisfiable, but some may have been read.
return numBytesRead;
} else {
throw invokerResponse;
}
}

// how much was read?
return numBytesRead;
}

/**
* GET one block from S3.
* @param buffer buffer to fill up.
* @param offset offset within the object.
* @param size size to retrieve.
* @throws IOException IO failure.
* @throws HttpChannelEOFException if the channel is closed during the read.
*/
@Retries.OnceTranslated
private void readOneBlock(ByteBuffer buffer, long offset, int size)
throws IOException {
int readSize = Math.min(size, buffer.remaining());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
Expand Down Expand Up @@ -153,20 +153,26 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
new Path("gopher:///localhost/" + testFile.getName()));

// no IO in open
FSDataInputStream in = verifyMetrics(() ->
fs.openFile(testFile)
.withFileStatus(st2)
.build()
.get(),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0));

// the stream gets opened during read
long readLen = verifyMetrics(() ->
readStream(in),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 1));
assertEquals("bytes read from file", fileLength, readLen);
FSDataInputStream in = null;
try {
in = verifyMetrics(() ->
fs.openFile(testFile)
.withFileStatus(st2)
.build()
.get(),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0));

// the stream gets opened during read
final FSDataInputStream s = in;
long readLen = verifyMetrics(() ->
readStream(s),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 1));
assertEquals("bytes read from file", fileLength, readLen);
} finally {
IOUtils.closeStream(in);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a.prefetch;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -55,7 +56,7 @@

/**
* Test the prefetching input stream, validates that the
* S3AInMemoryInputStream is working as expected.
* {@link S3AInMemoryInputStream} is working as expected.
*/
public class ITestInMemoryInputStream extends AbstractS3ACostTest {

Expand Down Expand Up @@ -215,8 +216,9 @@ public void testStatusProbesAfterClosingStream() throws Throwable {
assertEquals("Stream stats retrieved through stream before and after closing should match",
inputStreamStatistics, newInputStreamStatistics);

assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));

Assertions.assertThat(in.seekToNewSource(10))
.describedAs("seekToNewSource() not supported with prefetch: %s", in)
.isFalse();
}

}
Expand Down

0 comments on commit d39deb8

Please sign in to comment.