Skip to content

Commit

Permalink
HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch tes…
Browse files Browse the repository at this point in the history
…t runs

This is actually trickier than it seems as we will need to go deep into the
implementation of caching.

Specifically: the prefetcher knows the file length and if you open a file
shorter than that, but less than one block, the read is considered a failure
and the whole block is skipped, so read() of the nominally in-range data
returns -1.

This fix has to be considered a PoC and should be combined with the other
big PR for prefetching, apache#5832 as that is where changes should go.

Here is just test tuning and some differentiation of channel problems from
other EOFs.

Change-Id: Icdf7e2fb10ca77b6ca427eb207472fad277130d7
  • Loading branch information
steveloughran committed May 21, 2024
1 parent a011839 commit fb2fab2
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5475,7 +5475,11 @@ public boolean hasPathCapability(final Path path, final String capability)
case CommonPathCapabilities.ETAGS_AVAILABLE:
return true;

/*
// Is prefetching enabled?
case PREFETCH_ENABLED_KEY:
return prefetchEnabled;

/*
* Marker policy capabilities are handed off.
*/
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
Expand Down Expand Up @@ -289,7 +290,9 @@ private InternalConstants() {
STORE_CAPABILITY_S3_EXPRESS_STORAGE,
FS_S3A_CREATE_PERFORMANCE_ENABLED,
DIRECTORY_OPERATIONS_PURGE_UPLOADS,
ENABLE_MULTI_DELETE));
ENABLE_MULTI_DELETE,
PREFETCH_ENABLED_KEY
));

/**
* AWS V4 Auth Scheme to use when creating signers: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,18 @@ public void seek(long pos) throws IOException {
public int read() throws IOException {
throwIfClosed();

if (remoteObject.size() == 0
|| nextReadPos >= remoteObject.size()) {
if (remoteObject.size() == 0) {
LOG.debug("Rejecting read on empty file");
return -1;
}

if (nextReadPos >= remoteObject.size()) {
LOG.debug("Rejecting read past EOF");
return -1;
}

if (!ensureCurrentBuffer()) {
LOG.debug("Empty buffer in cache");
return -1;
}

Expand Down Expand Up @@ -338,12 +344,18 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
return 0;
}

if (remoteObject.size() == 0
|| nextReadPos >= remoteObject.size()) {
if (remoteObject.size() == 0) {
LOG.debug("Rejecting read on empty file");
return -1;
}

if (nextReadPos >= remoteObject.size()) {
LOG.debug("Rejecting read past EOF");
return -1;
}

if (!ensureCurrentBuffer()) {
LOG.debug("Empty buffer in cache");
return -1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.HttpChannelEOFException;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

Expand Down Expand Up @@ -117,11 +117,12 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
STREAM_READ_REMOTE_BLOCK_READ, () -> {
try {
this.readOneBlock(buffer, offset, size);
} catch (HttpChannelEOFException e) {
this.remoteObject.getStatistics().readException();
throw e;
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (SocketTimeoutException e) {
throw e;
} catch (IOException e) {
this.remoteObject.getStatistics().readException();
throw e;
Expand Down Expand Up @@ -168,7 +169,7 @@ private void readOneBlock(ByteBuffer buffer, long offset, int size)
String message = String.format(
"Unexpected end of stream: buffer[%d], readSize = %d, numRemainingBytes = %d",
buffer.capacity(), readSize, numRemainingBytes);
throw new EOFException(message);
throw new HttpChannelEOFException(remoteObject.getPath(), message, null);
}

if (numBytes > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.statistics.IOStatistics;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
Expand Down

0 comments on commit fb2fab2

Please sign in to comment.