Skip to content

Commit

Permalink
HADOOP-16759. Filesystem openFile() builder to take a FileStatus param (
Browse files Browse the repository at this point in the history
apache#1761). Contributed by Steve Loughran

* Enhanced builder + FS spec
* s3a FS to use this to skip HEAD on open
* and to use version/etag when opening the file

works with S3AFileStatus FS and S3ALocatedFileStatus
  • Loading branch information
steveloughran authored and RogPodge committed Mar 25, 2020
1 parent a95c8f4 commit 269a068
Show file tree
Hide file tree
Showing 18 changed files with 544 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -45,6 +44,7 @@
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -1355,22 +1355,20 @@ public boolean equals(Object other) {
* setting up the expectation that the {@code get()} call
* is needed to evaluate the result.
* @param path path to the file
* @param mandatoryKeys set of options declared as mandatory.
* @param options options set during the build sequence.
* @param bufferSize buffer size
* @param parameters open file parameters from the builder.
* @return a future which will evaluate to the opened file.
* @throws IOException failure to resolve the link.
* @throws IllegalArgumentException unknown mandatory key
*/
public CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
Set<String> mandatoryKeys,
Configuration options,
int bufferSize) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(),
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () -> open(path, bufferSize));
new CompletableFuture<>(), () ->
open(path, parameters.getBufferSize()));
}

public boolean hasPathCapability(final Path path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import com.google.common.base.Preconditions;
Expand All @@ -37,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DataChecksum;
Expand Down Expand Up @@ -845,14 +844,14 @@ public FutureDataInputStreamBuilder openFile(final Path path)
@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path,
final Set<String> mandatoryKeys,
final Configuration options,
final int bufferSize) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(),
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () -> open(path, bufferSize));
new CompletableFuture<>(),
() -> open(path, parameters.getBufferSize()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
Expand Down Expand Up @@ -266,20 +266,17 @@ public List<Token<?>> getDelegationTokens(String renewer) throws IOException {

/**
* Open a file by delegating to
* {@link FileSystem#openFileWithOptions(Path, Set, Configuration, int)}.
* {@link FileSystem#openFileWithOptions(Path, org.apache.hadoop.fs.impl.OpenFileParameters)}.
* @param path path to the file
* @param mandatoryKeys set of options declared as mandatory.
* @param options options set during the build sequence.
* @param bufferSize buffer size
* @return a future which will evaluate to the opened file.
* @param parameters open file parameters from the builder.
*
* @return a future which will evaluate to the opened file.ControlAlpha
* @throws IOException failure to resolve the link.
* @throws IllegalArgumentException unknown mandatory key
*/
public CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
Set<String> mandatoryKeys,
Configuration options,
int bufferSize) throws IOException {
return fsImpl.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
final OpenFileParameters parameters) throws IOException {
return fsImpl.openFileWithOptions(path, parameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.FsLinkResolution;
import org.apache.hadoop.fs.impl.PathCapabilitiesSupport;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -2924,16 +2924,18 @@ protected FSDataInputStreamBuilder(
@Override
public CompletableFuture<FSDataInputStream> build() throws IOException {
final Path absF = fixRelativePart(getPath());
OpenFileParameters parameters = new OpenFileParameters()
.withMandatoryKeys(getMandatoryKeys())
.withOptions(getOptions())
.withBufferSize(getBufferSize())
.withStatus(getStatus());
return new FSLinkResolver<CompletableFuture<FSDataInputStream>>() {
@Override
public CompletableFuture<FSDataInputStream> next(
final AbstractFileSystem fs,
final Path p)
throws IOException {
return fs.openFileWithOptions(p,
getMandatoryKeys(),
getOptions(),
getBufferSize());
return fs.openFileWithOptions(p, parameters);
}
}.resolve(FileContext.this, absF);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -4485,43 +4486,39 @@ public FutureDataInputStreamBuilder openFile(PathHandle pathHandle)
* the action of opening the file should begin.
*
* The base implementation performs a blocking
* call to {@link #open(Path, int)}in this call;
* call to {@link #open(Path, int)} in this call;
* the actual outcome is in the returned {@code CompletableFuture}.
* This avoids having to create some thread pool, while still
* setting up the expectation that the {@code get()} call
* is needed to evaluate the result.
* @param path path to the file
* @param mandatoryKeys set of options declared as mandatory.
* @param options options set during the build sequence.
* @param bufferSize buffer size
* @param parameters open file parameters from the builder.
* @return a future which will evaluate to the opened file.
* @throws IOException failure to resolve the link.
* @throws IllegalArgumentException unknown mandatory key
*/
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path,
final Set<String> mandatoryKeys,
final Configuration options,
final int bufferSize) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(),
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () -> open(path, bufferSize));
new CompletableFuture<>(), () ->
open(path, parameters.getBufferSize()));
}

/**
* Execute the actual open file operation.
* The base implementation performs a blocking
* call to {@link #open(Path, int)}in this call;
* call to {@link #open(Path, int)} in this call;
* the actual outcome is in the returned {@code CompletableFuture}.
* This avoids having to create some thread pool, while still
* setting up the expectation that the {@code get()} call
* is needed to evaluate the result.
* @param pathHandle path to the file
* @param mandatoryKeys set of options declared as mandatory.
* @param options options set during the build sequence.
* @param bufferSize buffer size
* @param parameters open file parameters from the builder.
* @return a future which will evaluate to the opened file.
* @throws IOException failure to resolve the link.
* @throws IllegalArgumentException unknown mandatory key
Expand All @@ -4530,14 +4527,13 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
*/
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final PathHandle pathHandle,
final Set<String> mandatoryKeys,
final Configuration options,
final int bufferSize) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Collections.emptySet(), "");
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
try {
result.complete(open(pathHandle, bufferSize));
result.complete(open(pathHandle, parameters.getBufferSize()));
} catch (UnsupportedOperationException tx) {
// fail fast here
throw tx;
Expand Down Expand Up @@ -4639,12 +4635,17 @@ protected FSDataInputStreamBuilder(
@Override
public CompletableFuture<FSDataInputStream> build() throws IOException {
Optional<Path> optionalPath = getOptionalPath();
OpenFileParameters parameters = new OpenFileParameters()
.withMandatoryKeys(getMandatoryKeys())
.withOptions(getOptions())
.withBufferSize(getBufferSize())
.withStatus(super.getStatus()); // explicit to avoid IDE warnings
if(optionalPath.isPresent()) {
return getFS().openFileWithOptions(optionalPath.get(),
getMandatoryKeys(), getOptions(), getBufferSize());
parameters);
} else {
return getFS().openFileWithOptions(getPathHandle(),
getMandatoryKeys(), getOptions(), getBufferSize());
parameters);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -714,20 +714,15 @@ public FutureDataInputStreamBuilder openFile(final PathHandle pathHandle)
@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path,
final Set<String> mandatoryKeys,
final Configuration options,
final int bufferSize) throws IOException {
return fs.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
final OpenFileParameters parameters) throws IOException {
return fs.openFileWithOptions(path, parameters);
}

@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final PathHandle pathHandle,
final Set<String> mandatoryKeys,
final Configuration options,
final int bufferSize) throws IOException {
return fs.openFileWithOptions(pathHandle, mandatoryKeys, options,
bufferSize);
final OpenFileParameters parameters) throws IOException {
return fs.openFileWithOptions(pathHandle, parameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -440,10 +439,8 @@ public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
@Override
public CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path,
final Set<String> mandatoryKeys,
final Configuration options,
final int bufferSize) throws IOException {
return myFs.openFileWithOptions(path, mandatoryKeys, options, bufferSize);
final OpenFileParameters parameters) throws IOException {
return myFs.openFileWithOptions(path, parameters);
}

public boolean hasPathCapability(final Path path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,15 @@ public interface FutureDataInputStreamBuilder
CompletableFuture<FSDataInputStream> build()
throws IllegalArgumentException, UnsupportedOperationException,
IOException;

/**
* A FileStatus may be provided to the open request.
* It is up to the implementation whether to use this or not.
* @param status status.
* @return the builder.
*/
default FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
return this;
}

}
Loading

0 comments on commit 269a068

Please sign in to comment.