Skip to content

Commit

Permalink
HADOOP-18184. openFile lets you tune prefetching
Browse files Browse the repository at this point in the history
that's for testing; we can tweak policy while reusing the same
fs.

Change-Id: I1832bafc1a656259e3b53d18473d5a0480a8153f
  • Loading branch information
steveloughran committed Aug 9, 2023
1 parent ec5ae7a commit 7e40bd9
Show file tree
Hide file tree
Showing 23 changed files with 232 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
buffer.rewind();
cachePut(blockNumber, buffer);
data.setDone();
} catch (IOException e) {
} catch (Exception e) {
numCachingErrors.incrementAndGet();
LOG.info("error adding block to cache: {}. {}", data, e.getMessage());
LOG.debug("error adding block to cache: {}", data, e);
Expand Down Expand Up @@ -718,6 +718,9 @@ public String toString() {
sb.append("pool: ");
sb.append(bufferPool.toString());

sb.append("; numReadErrors: ").append(numReadErrors.get());
sb.append("; numCachingErrors: ").append(numCachingErrors.get());

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,12 @@ The S3A Connector supports custom options for readahead and seek policy.
| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream. (Since 3.3.5) |
| `fs.s3a.prefetch.block.size` | `int` | Block size in bytes for prefetching. |
| `fs.s3a.prefetch.block.count` | `int` | Number of blocks for prefetching. |

Irrespective of the value of `fs.s3a.prefetch.block.count`, the maximum number of active
prefetches in a single filesystem instance is limited to the value of the same option
in the configuration used to create the filesystem instance; callers can only choose smaller values.

If the option set contains a SQL statement in the `fs.s3a.select.sql` statement,
then the file is opened as an S3 Select query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1245,13 +1245,13 @@ private Constants() {
// 64 MB of heap space (8 blocks x 8 MB each).

/**
* The size of a single prefetched block in number of bytes.
* The size of a single prefetched block in number of bytes: {@value}.
*/
public static final String PREFETCH_BLOCK_SIZE_KEY = "fs.s3a.prefetch.block.size";
public static final int PREFETCH_BLOCK_DEFAULT_SIZE = 8 * 1024 * 1024;

/**
* Maximum number of blocks prefetched at any given time.
* Maximum number of blocks prefetched at any given time: {@value}.
*/
public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count";
public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,15 @@ public class S3AReadOpContext extends S3AOpContext {
// S3 reads are prefetched asynchronously using this future pool.
private ExecutorServiceFuturePool futurePool;

// Size in bytes of a single prefetch block.
private final int prefetchBlockSize;
/**
* Size in bytes of a single prefetch block.
*/
private int prefetchBlockSize;

// Size of prefetch queue (in number of blocks).
private final int prefetchBlockCount;
/**
* Size of prefetch queue (in number of blocks).
*/
private int prefetchBlockCount;

/**
* Where does the read start from, if known.
Expand Down Expand Up @@ -324,6 +328,26 @@ public S3AReadOpContext withSplitEnd(final Optional<Long> value) {
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3AReadOpContext withPrefetchBlockSize(final int value) {
prefetchBlockSize = value;
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3AReadOpContext withPrefetchBlockCount(final int value) {
prefetchBlockCount = value;
return this;
}

/**
* What is the split end, if known?
* @return split end.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ private InternalConstants() {
Set<String> keys = Stream.of(
Constants.ASYNC_DRAIN_THRESHOLD,
Constants.INPUT_FADVISE,
Constants.PREFETCH_BLOCK_COUNT_KEY,
Constants.PREFETCH_BLOCK_SIZE_KEY,
Constants.READAHEAD_RANGE)
.collect(Collectors.toSet());
keys.addAll(FS_OPTION_OPENFILE_STANDARD_OPTIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_COUNT_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_COUNT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.util.Preconditions.checkArgument;

Expand Down Expand Up @@ -263,6 +266,9 @@ public OpenFileInformation prepareToOpenFile(
splitStart = empty();
splitEnd = empty();
}
Optional<Integer> prefetchBlockSize = getOptionalInteger(options, PREFETCH_BLOCK_SIZE_KEY);
Optional<Integer> prefetchBlockCount = getOptionalInteger(options, PREFETCH_BLOCK_COUNT_KEY);


// read end is the open file value
fileLength = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
Expand Down Expand Up @@ -298,6 +304,8 @@ public OpenFileInformation prepareToOpenFile(
S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
.withReadAheadRange(
builderSupport.getPositiveLong(READAHEAD_RANGE, defaultReadAhead))
.withPrefetchBlockCount(prefetchBlockCount)
.withPrefetchBlockSize(prefetchBlockSize)
.withSplitStart(splitStart)
.withSplitEnd(splitEnd)
.withStatus(fileStatus)
Expand Down Expand Up @@ -374,6 +382,17 @@ public Optional<Long> getOptionalLong(final Configuration options, String key) {
}
}

/**
* Get an int value with resilience to unparseable values.
* @param options configuration to parse
* @param key key to log
* @return long value or empty()
*/
public Optional<Integer> getOptionalInteger(final Configuration options, String key) {
return getOptionalLong(options, key)
.map(l -> l.intValue());
}

/**
* The information on a file needed to open it.
*/
Expand Down Expand Up @@ -410,6 +429,16 @@ public static final class OpenFileInformation {
*/
private Optional<Long> splitEnd = empty();

/**
* Prefetch block size.
*/
private Optional<Integer> prefetchBlockSize = empty();

/**
* Prefetch block count.
*/
private Optional<Integer> prefetchBlockCount = empty();

/**
* What is the file length?
* Negative if not known.
Expand Down Expand Up @@ -464,6 +493,14 @@ public int getBufferSize() {
return bufferSize;
}

public Optional<Integer> getPrefetchBlockSize() {
return prefetchBlockSize;
}

public Optional<Integer> getPrefetchBlockCount() {
return prefetchBlockCount;
}

/**
* Where does the read start from, if known.
* @return split start.
Expand All @@ -489,6 +526,8 @@ public String toString() {
", inputPolicy=" + inputPolicy +
", changePolicy=" + changePolicy +
", readAheadRange=" + readAheadRange +
", prefetchBlockSize=" + prefetchBlockSize +
", prefetchBlockCount=" + prefetchBlockCount +
", splitStart=" + splitStart +
", splitEnd=" + splitEnd +
", bufferSize=" + bufferSize +
Expand Down Expand Up @@ -574,6 +613,26 @@ public OpenFileInformation withBufferSize(final int value) {
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withPrefetchBlockSize(final Optional<Integer> value) {
prefetchBlockSize = value;
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public OpenFileInformation withPrefetchBlockCount(final Optional<Integer> value) {
prefetchBlockCount = value;
return this;
}

/**
* Set split start.
* @param value new value -must not be null
Expand Down Expand Up @@ -622,13 +681,16 @@ public OpenFileInformation withAsyncDrainThreshold(final long value) {
* @return the context
*/
public S3AReadOpContext applyOptions(S3AReadOpContext roc) {
return roc
.withInputPolicy(inputPolicy)
roc.withInputPolicy(inputPolicy)
.withChangeDetectionPolicy(changePolicy)
.withAsyncDrainThreshold(asyncDrainThreshold)
.withReadahead(readAheadRange)
.withSplitStart(splitStart)
.withSplitEnd(splitEnd);
prefetchBlockCount.map(roc::withPrefetchBlockCount);
prefetchBlockSize.map(roc::withPrefetchBlockSize);
return roc;

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
S3ACachingInputStream.class);

/**
* Number of blocks queued for prefching.
* Number of blocks queued for prefetching.
*/
private final int numBlocksToPrefetch;

Expand Down Expand Up @@ -211,13 +211,15 @@ protected boolean ensureCurrentBuffer() throws IOException {

@Override
public String toString() {
if (isClosed()) {
return "closed";
}

StringBuilder sb = new StringBuilder();
sb.append(String.format("%s%n", super.toString()));
sb.append(" block manager: ").append(blockManager);
if (isClosed()) {
sb.append("closed");
} else {
sb.append("file position: ").append(getFilePosition());
// block manager may be null.
sb.append("; block manager: ").append(blockManager);
}
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public long getPos() throws IOException {
* @param pos the absolute position to seek to.
* @throws IOException if there an error during this operation.
*
* @throws IllegalArgumentException if pos is outside of the range [0, file size].
* @throws IllegalArgumentException if pos is steveoutside of the range [0, file size].
*/
public void seek(long pos) throws IOException {
throwIfClosed();
Expand Down Expand Up @@ -471,10 +471,9 @@ protected boolean closeStream(final boolean unbuffer) {
return false;
}

if (unbuffer) {
// release all the blocks
blockData = null;
}
// release all the blocks
blockData = null;

reader.close();
reader = null;
// trigger GC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.PREFETCH_OPTIONS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPrefetchOption;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

Expand Down Expand Up @@ -77,7 +77,7 @@ protected AbstractFSContract createContract(Configuration conf) {
protected Configuration createConfiguration() {
final Configuration conf = prepareTestConfiguration(super.createConfiguration());
disableFilesystemCaching(conf);
return enablePrefetch(conf, prefetch);
return setPrefetchOption(conf, prefetch);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPrefetchOption;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
Expand Down Expand Up @@ -123,7 +123,7 @@ public ITestS3AContractSeek(final String seekPolicy,
*/
@Override
protected Configuration createConfiguration() {
Configuration conf = enablePrefetch(
Configuration conf = setPrefetchOption(
prepareTestConfiguration(super.createConfiguration()), prefetch);
// purge any per-bucket overrides.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import static org.apache.hadoop.fs.s3a.S3ATestUtils.PREFETCH_OPTIONS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPrefetchOption;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration;

@RunWith(Parameterized.class)
Expand Down Expand Up @@ -60,7 +60,7 @@ public ITestS3AContractUnbuffer(final boolean prefetch) {
protected Configuration createConfiguration() {
final Configuration conf = prepareTestConfiguration(super.createConfiguration());
disableFilesystemCaching(conf);
return enablePrefetch(conf, prefetch);
return setPrefetchOption(conf, prefetch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.io.IOException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPrefetchOption;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
Expand All @@ -63,7 +63,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {

@Override
protected Configuration createConfiguration() {
return enablePrefetch(super.createConfiguration(), false);
return setPrefetchOption(super.createConfiguration(), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ public static Configuration prepareTestConfiguration(final Configuration conf) {

boolean prefetchEnabled =
getTestPropertyBool(conf, PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
enablePrefetch(conf, prefetchEnabled);
setPrefetchOption(conf, prefetchEnabled);

return conf;
}
Expand All @@ -625,7 +625,8 @@ public static Configuration prepareTestConfiguration(final Configuration conf) {
* @param prefetch prefetch option
* @return the modified configuration.
*/
public static Configuration enablePrefetch(final Configuration conf, boolean prefetch) {
public static Configuration setPrefetchOption(final Configuration conf,
final boolean prefetch) {
removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, prefetch);
return conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.hadoop.fs.statistics.IOStatisticsContext;

import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPrefetchOption;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
Expand Down Expand Up @@ -74,7 +74,7 @@ public class ITestInMemoryInputStream extends AbstractS3ACostTest {
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
enablePrefetch(conf, true);
setPrefetchOption(conf, true);
return conf;
}

Expand Down
Loading

0 comments on commit 7e40bd9

Please sign in to comment.