Skip to content

Commit

Permalink
HADOOP-18792. s3a prefetching to use split start/end options to limit…
Browse files Browse the repository at this point in the history
… prefetch range

This passes the values down but doesn't interpret them; future work

Change-Id: I523b26e5a5a43fbf6ba5d2b6e44614c7e4fc70b7
  • Loading branch information
steveloughran committed Jul 12, 2023
1 parent d77338f commit 65fd9ba
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ protected S3AReadOpContext createReadContext(
prefetchBlockCount)
.withAuditSpan(auditSpan);
openFileHelper.applyDefaultOptions(roc);
return roc.build();
return roc;
}

/**
Expand Down Expand Up @@ -2247,8 +2247,8 @@ public S3ObjectAttributes createObjectAttributes(

@Override
public S3AReadOpContext createReadContext(final FileStatus fileStatus) {
return S3AFileSystem.this.createReadContext(fileStatus,
auditSpan);
return S3AFileSystem.this.createReadContext(fileStatus, auditSpan)
.build();
}

@Override
Expand Down Expand Up @@ -5222,6 +5222,7 @@ private FSDataInputStream select(final Path source,
fileStatus,
auditSpan);
fileInformation.applyOptions(readContext);
readContext.build();

if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None
&& fileStatus.getEtag() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.s3a;

import java.util.Optional;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -32,6 +34,7 @@
import org.apache.hadoop.util.Preconditions;

import static java.util.Objects.requireNonNull;
import static java.util.Optional.empty;

/**
* Read-specific operation context struct.
Expand Down Expand Up @@ -84,6 +87,16 @@ public class S3AReadOpContext extends S3AOpContext {
// Size of prefetch queue (in number of blocks).
private final int prefetchBlockCount;

/**
* Where does the read start from, if known.
*/
private Optional<Long> splitStart = empty();

/**
* What is the split end, if known?
*/
private Optional<Long> splitEnd = empty();

/**
* Instantiate.
* @param path path of read
Expand Down Expand Up @@ -283,6 +296,40 @@ public int getPrefetchBlockCount() {
return this.prefetchBlockCount;
}

/**
* Where does the read start from, if known.
*/
public Optional<Long> getSplitStart() {
return splitStart;
}

/**
* Set split start.
* @param value new value -must not be null
* @return the builder
*/
public S3AReadOpContext withSplitStart(final Optional<Long> value) {
splitStart = requireNonNull(value);
return this;
}

/**
* Set split end.
* @param value new value -must not be null
* @return the builder
*/
public S3AReadOpContext withSplitEnd(final Optional<Long> value) {
splitEnd = requireNonNull(value);
return this;
}

/**
* What is the split end, if known?
*/
public Optional<Long> getSplitEnd() {
return splitEnd;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;

import org.slf4j.Logger;
Expand All @@ -38,6 +39,8 @@
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.fs.s3a.select.SelectConstants;

import static java.util.Objects.requireNonNull;
import static java.util.Optional.empty;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
Expand Down Expand Up @@ -249,16 +252,16 @@ public OpenFileInformation prepareToOpenFile(
}
FSBuilderSupport builderSupport = new FSBuilderSupport(options);
// determine start and end of file.
long splitStart = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
Optional<Long> splitStart = getOptionalLong(options,FS_OPTION_OPENFILE_SPLIT_START);

// split end
long splitEnd = builderSupport.getLong(
FS_OPTION_OPENFILE_SPLIT_END, LENGTH_UNKNOWN);

if (splitStart > 0 && splitStart > splitEnd) {
LOG.warn("Split start {} is greater than split end {}, resetting",
Optional<Long> splitEnd = getOptionalLong(options, FS_OPTION_OPENFILE_SPLIT_END);
// if there's a mismatch between start and end, set both to empty
if (splitEnd.isPresent() && splitEnd.get() < splitStart.orElse(0L)) {
LOG.debug("Split start {} is greater than split end {}, resetting",
splitStart, splitEnd);
splitStart = 0;
splitStart = empty();
splitEnd = empty();
}

// read end is the open file value
Expand Down Expand Up @@ -336,8 +339,8 @@ public OpenFileInformation openSimpleFile(final int bufferSize) {
.withFileLength(LENGTH_UNKNOWN)
.withInputPolicy(defaultInputPolicy)
.withReadAheadRange(defaultReadAhead)
.withSplitStart(0)
.withSplitEnd(LENGTH_UNKNOWN)
.withSplitStart(empty())
.withSplitEnd(empty())
.build();
}

Expand All @@ -352,6 +355,25 @@ public String toString() {
'}';
}


/**
* Get a long value with resilience to unparseable values.
* @param options configuration to parse
* @param key key to log
* @return long value or empty()
*/
public Optional<Long> getOptionalLong(final Configuration options, String key) {
final String v = options.getTrimmed(key, "");
if (v.isEmpty()) {
return empty();
}
try {
return Optional.of(Long.parseLong(v));
} catch (NumberFormatException e) {
return empty();
}
}

/**
* The information on a file needed to open it.
*/
Expand Down Expand Up @@ -379,15 +401,14 @@ public static final class OpenFileInformation {
private int bufferSize;

/**
* Where does the read start from. 0 unless known.
* Where does the read start from, if known.
*/
private long splitStart;
private Optional<Long> splitStart = empty();

/**
* What is the split end?
* Negative if not known.
* What is the split end, if known?
*/
private long splitEnd = -1;
private Optional<Long> splitEnd = empty();

/**
* What is the file length?
Expand Down Expand Up @@ -443,11 +464,17 @@ public int getBufferSize() {
return bufferSize;
}

public long getSplitStart() {
/**
* Where does the read start from, if known.
*/
public Optional<Long> getSplitStart() {
return splitStart;
}

public long getSplitEnd() {
/**
* What is the split end, if known?
*/
public Optional<Long> getSplitEnd() {
return splitEnd;
}

Expand Down Expand Up @@ -546,25 +573,26 @@ public OpenFileInformation withBufferSize(final int value) {
}

/**
* Set builder value.
* @param value new value
* Set split start.
* @param value new value -must not be null
* @return the builder
*/
public OpenFileInformation withSplitStart(final long value) {
splitStart = value;
public OpenFileInformation withSplitStart(final Optional<Long> value) {
splitStart = requireNonNull(value);
return this;
}

/**
* Set builder value.
* @param value new value
* Set split end.
* @param value new value -must not be null
* @return the builder
*/
public OpenFileInformation withSplitEnd(final long value) {
splitEnd = value;
public OpenFileInformation withSplitEnd(final Optional<Long> value) {
splitEnd = requireNonNull(value);
return this;
}


/**
* Set builder value.
* @param value new value
Expand Down Expand Up @@ -596,7 +624,9 @@ public S3AReadOpContext applyOptions(S3AReadOpContext roc) {
.withInputPolicy(inputPolicy)
.withChangeDetectionPolicy(changePolicy)
.withAsyncDrainThreshold(asyncDrainThreshold)
.withReadahead(readAheadRange);
.withReadahead(readAheadRange)
.withSplitStart(splitStart)
.withSplitEnd(splitEnd);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ private S3AInputStream getMockedS3AInputStream() {

S3AReadOpContext s3AReadOpContext = fs.createReadContext(
s3AFileStatus,
NoopSpan.INSTANCE);
NoopSpan.INSTANCE)
.build();

return new S3AInputStream(
s3AReadOpContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void testFileLength() throws Throwable {
public void testSplitEndSetsLength() throws Throwable {
long bigFile = 2L ^ 34;
assertOpenFile(FS_OPTION_OPENFILE_SPLIT_END, Long.toString(bigFile))
.matches(p -> p.getSplitEnd() == bigFile, "split end")
.matches(p -> p.getSplitEnd().get() == bigFile, "split end")
.matches(p -> p.getFileLength() == -1, "file length")
.matches(p -> p.getStatus() == null, "status");
}
Expand All @@ -385,8 +385,8 @@ public void testSplitEndAndLength() throws Throwable {
new OpenFileParameters()
.withMandatoryKeys(s)
.withOptions(conf)))
.matches(p -> p.getSplitStart() == 0, "split start")
.matches(p -> p.getSplitEnd() == splitEnd, "split end")
.matches(p -> !p.getSplitStart().isPresent(), "split start")
.matches(p -> !p.getSplitEnd().isPresent(), "split end")
.matches(p -> p.getStatus().getLen() == len, "file length");
}

Expand Down

0 comments on commit 65fd9ba

Please sign in to comment.