diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 999186f8cd5ae5..73d94c8691db09 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1718,7 +1718,7 @@ protected S3AReadOpContext createReadContext( prefetchBlockCount) .withAuditSpan(auditSpan); openFileHelper.applyDefaultOptions(roc); - return roc.build(); + return roc; } /** @@ -2247,8 +2247,8 @@ public S3ObjectAttributes createObjectAttributes( @Override public S3AReadOpContext createReadContext(final FileStatus fileStatus) { - return S3AFileSystem.this.createReadContext(fileStatus, - auditSpan); + return S3AFileSystem.this.createReadContext(fileStatus, auditSpan) + .build(); } @Override @@ -5222,6 +5222,7 @@ private FSDataInputStream select(final Path source, fileStatus, auditSpan); fileInformation.applyOptions(readContext); + readContext.build(); if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None && fileStatus.getEtag() != null) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index 55351f0c81396f..7eec44500938a0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.s3a; +import java.util.Optional; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +34,7 @@ import org.apache.hadoop.util.Preconditions; import static java.util.Objects.requireNonNull; +import static java.util.Optional.empty; /** * Read-specific operation context struct. @@ -84,6 +87,16 @@ public class S3AReadOpContext extends S3AOpContext { // Size of prefetch queue (in number of blocks). private final int prefetchBlockCount; + /** + * Where does the read start from, if known. + */ + private Optional splitStart = empty(); + + /** + * What is the split end, if known? + */ + private Optional splitEnd = empty(); + /** * Instantiate. * @param path path of read @@ -283,6 +296,40 @@ public int getPrefetchBlockCount() { return this.prefetchBlockCount; } + /** + * Where does the read start from, if known. + */ + public Optional getSplitStart() { + return splitStart; + } + + /** + * Set split start. + * @param value new value -must not be null + * @return the builder + */ + public S3AReadOpContext withSplitStart(final Optional value) { + splitStart = requireNonNull(value); + return this; + } + + /** + * Set split end. + * @param value new value -must not be null + * @return the builder + */ + public S3AReadOpContext withSplitEnd(final Optional value) { + splitEnd = requireNonNull(value); + return this; + } + + /** + * What is the split end, if known? + */ + public Optional getSplitEnd() { + return splitEnd; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java index 4703d635672457..488bfb24b400de 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; +import java.util.Optional; import java.util.Set; import org.slf4j.Logger; @@ -38,6 +39,8 @@ import org.apache.hadoop.fs.s3a.select.InternalSelectConstants; import org.apache.hadoop.fs.s3a.select.SelectConstants; +import static java.util.Objects.requireNonNull; +import static java.util.Optional.empty; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; @@ -249,16 +252,16 @@ public OpenFileInformation prepareToOpenFile( } FSBuilderSupport builderSupport = new FSBuilderSupport(options); // determine start and end of file. - long splitStart = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_SPLIT_START, 0); + Optional splitStart = getOptionalLong(options,FS_OPTION_OPENFILE_SPLIT_START); // split end - long splitEnd = builderSupport.getLong( - FS_OPTION_OPENFILE_SPLIT_END, LENGTH_UNKNOWN); - - if (splitStart > 0 && splitStart > splitEnd) { - LOG.warn("Split start {} is greater than split end {}, resetting", + Optional splitEnd = getOptionalLong(options, FS_OPTION_OPENFILE_SPLIT_END); + // if there's a mismatch between start and end, set both to empty + if (splitEnd.isPresent() && splitEnd.get() < splitStart.orElse(0L)) { + LOG.debug("Split start {} is greater than split end {}, resetting", splitStart, splitEnd); - splitStart = 0; + splitStart = empty(); + splitEnd = empty(); } // read end is the open file value @@ -336,8 +339,8 @@ public OpenFileInformation openSimpleFile(final int bufferSize) { .withFileLength(LENGTH_UNKNOWN) .withInputPolicy(defaultInputPolicy) .withReadAheadRange(defaultReadAhead) - .withSplitStart(0) - .withSplitEnd(LENGTH_UNKNOWN) + .withSplitStart(empty()) + .withSplitEnd(empty()) .build(); } @@ -352,6 +355,25 @@ public String toString() { '}'; } + + /** + * Get a long value with resilience to unparseable values. + * @param options configuration to parse + * @param key key to log + * @return long value or empty() + */ + public Optional getOptionalLong(final Configuration options, String key) { + final String v = options.getTrimmed(key, ""); + if (v.isEmpty()) { + return empty(); + } + try { + return Optional.of(Long.parseLong(v)); + } catch (NumberFormatException e) { + return empty(); + } + } + /** * The information on a file needed to open it. */ @@ -379,15 +401,14 @@ public static final class OpenFileInformation { private int bufferSize; /** - * Where does the read start from. 0 unless known. + * Where does the read start from, if known. */ - private long splitStart; + private Optional splitStart = empty(); /** - * What is the split end? - * Negative if not known. + * What is the split end, if known? */ - private long splitEnd = -1; + private Optional splitEnd = empty(); /** * What is the file length? @@ -443,11 +464,17 @@ public int getBufferSize() { return bufferSize; } - public long getSplitStart() { + /** + * Where does the read start from, if known. + */ + public Optional getSplitStart() { return splitStart; } - public long getSplitEnd() { + /** + * What is the split end, if known? + */ + public Optional getSplitEnd() { return splitEnd; } @@ -546,25 +573,26 @@ public OpenFileInformation withBufferSize(final int value) { } /** - * Set builder value. - * @param value new value + * Set split start. + * @param value new value -must not be null * @return the builder */ - public OpenFileInformation withSplitStart(final long value) { - splitStart = value; + public OpenFileInformation withSplitStart(final Optional value) { + splitStart = requireNonNull(value); return this; } /** - * Set builder value. - * @param value new value + * Set split end. + * @param value new value -must not be null * @return the builder */ - public OpenFileInformation withSplitEnd(final long value) { - splitEnd = value; + public OpenFileInformation withSplitEnd(final Optional value) { + splitEnd = requireNonNull(value); return this; } + /** * Set builder value. * @param value new value @@ -596,7 +624,9 @@ public S3AReadOpContext applyOptions(S3AReadOpContext roc) { .withInputPolicy(inputPolicy) .withChangeDetectionPolicy(changePolicy) .withAsyncDrainThreshold(asyncDrainThreshold) - .withReadahead(readAheadRange); + .withReadahead(readAheadRange) + .withSplitStart(splitStart) + .withSplitEnd(splitEnd); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index c62bf5daca3a4a..f15ef6e934f832 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -105,7 +105,8 @@ private S3AInputStream getMockedS3AInputStream() { S3AReadOpContext s3AReadOpContext = fs.createReadContext( s3AFileStatus, - NoopSpan.INSTANCE); + NoopSpan.INSTANCE) + .build(); return new S3AInputStream( s3AReadOpContext, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java index 17f210dd586e89..0f066a814824f8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java @@ -358,7 +358,7 @@ public void testFileLength() throws Throwable { public void testSplitEndSetsLength() throws Throwable { long bigFile = 2L ^ 34; assertOpenFile(FS_OPTION_OPENFILE_SPLIT_END, Long.toString(bigFile)) - .matches(p -> p.getSplitEnd() == bigFile, "split end") + .matches(p -> p.getSplitEnd().get() == bigFile, "split end") .matches(p -> p.getFileLength() == -1, "file length") .matches(p -> p.getStatus() == null, "status"); } @@ -385,8 +385,8 @@ public void testSplitEndAndLength() throws Throwable { new OpenFileParameters() .withMandatoryKeys(s) .withOptions(conf))) - .matches(p -> p.getSplitStart() == 0, "split start") - .matches(p -> p.getSplitEnd() == splitEnd, "split end") + .matches(p -> !p.getSplitStart().isPresent(), "split start") + .matches(p -> !p.getSplitEnd().isPresent(), "split end") .matches(p -> p.getStatus().getLen() == len, "file length"); }