From 269a068039ff1e50cc1a5052933c378ad9ff56d3 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 21 Jan 2020 22:31:51 +0000 Subject: [PATCH] HADOOP-16759. Filesystem openFile() builder to take a FileStatus param (#1761). Contributed by Steve Loughran * Enhanced builder + FS spec * s3a FS to use this to skip HEAD on open * and to use version/etag when opening the file works with S3AFileStatus FS and S3ALocatedFileStatus --- .../apache/hadoop/fs/AbstractFileSystem.java | 16 +- .../apache/hadoop/fs/ChecksumFileSystem.java | 13 +- .../hadoop/fs/DelegateToFileSystem.java | 17 +-- .../org/apache/hadoop/fs/FileContext.java | 12 +- .../java/org/apache/hadoop/fs/FileSystem.java | 41 ++--- .../apache/hadoop/fs/FilterFileSystem.java | 15 +- .../java/org/apache/hadoop/fs/FilterFs.java | 9 +- .../fs/FutureDataInputStreamBuilder.java | 11 ++ .../FutureDataInputStreamBuilderImpl.java | 33 +++- .../hadoop/fs/impl/OpenFileParameters.java | 94 ++++++++++++ .../site/markdown/filesystem/filesystem.md | 23 ++- .../filesystem/fsdatainputstreambuilder.md | 41 +++++ .../fs/contract/AbstractContractOpenTest.java | 9 ++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 120 +++++++++++---- .../fs/s3a/ITestS3ARemoteFileChanged.java | 141 ++++++++++++++++-- .../s3a/ITestS3GuardOutOfBandOperations.java | 56 ++++--- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 27 ++++ .../hadoop/fs/s3a/select/ITestS3Select.java | 7 +- 18 files changed, 544 insertions(+), 141 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java index 0453ca14537c3..1df68b647c99a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -45,6 +44,7 @@ import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -1355,22 +1355,20 @@ public boolean equals(Object other) { * setting up the expectation that the {@code get()} call * is needed to evaluate the result. * @param path path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. - * @param bufferSize buffer size + * @param parameters open file parameters from the builder. * @return a future which will evaluate to the opened file. * @throws IOException failure to resolve the link. * @throws IllegalArgumentException unknown mandatory key */ public CompletableFuture openFileWithOptions(Path path, - Set mandatoryKeys, - Configuration options, - int bufferSize) throws IOException { - AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), Collections.emptySet(), "for " + path); return LambdaUtils.eval( - new CompletableFuture<>(), () -> open(path, bufferSize)); + new CompletableFuture<>(), () -> + open(path, parameters.getBufferSize())); } public boolean hasPathCapability(final Path path, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 5e5d29a28bfce..cc9c284c9fa55 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -27,8 +27,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; -import java.util.Locale; -import java.util.Set; import java.util.concurrent.CompletableFuture; import com.google.common.base.Preconditions; @@ -37,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DataChecksum; @@ -845,14 +844,14 @@ public FutureDataInputStreamBuilder openFile(final Path path) @Override protected CompletableFuture openFileWithOptions( final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), Collections.emptySet(), "for " + path); return LambdaUtils.eval( - new CompletableFuture<>(), () -> open(path, bufferSize)); + new CompletableFuture<>(), + () -> open(path, parameters.getBufferSize())); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java index a8f294f379158..3a139781e0372 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java @@ -24,13 +24,13 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -266,20 +266,17 @@ public List> getDelegationTokens(String renewer) throws IOException { /** * Open a file by delegating to - * {@link FileSystem#openFileWithOptions(Path, Set, Configuration, int)}. + * {@link FileSystem#openFileWithOptions(Path, org.apache.hadoop.fs.impl.OpenFileParameters)}. * @param path path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. - * @param bufferSize buffer size - * @return a future which will evaluate to the opened file. + * @param parameters open file parameters from the builder. + * + * @return a future which will evaluate to the opened file.ControlAlpha * @throws IOException failure to resolve the link. * @throws IllegalArgumentException unknown mandatory key */ public CompletableFuture openFileWithOptions(Path path, - Set mandatoryKeys, - Configuration options, - int bufferSize) throws IOException { - return fsImpl.openFileWithOptions(path, mandatoryKeys, options, bufferSize); + final OpenFileParameters parameters) throws IOException { + return fsImpl.openFileWithOptions(path, parameters); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index b2c1369a9c1fe..df93e89750ee0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -47,7 +47,7 @@ import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.FsLinkResolution; -import org.apache.hadoop.fs.impl.PathCapabilitiesSupport; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -2924,16 +2924,18 @@ protected FSDataInputStreamBuilder( @Override public CompletableFuture build() throws IOException { final Path absF = fixRelativePart(getPath()); + OpenFileParameters parameters = new OpenFileParameters() + .withMandatoryKeys(getMandatoryKeys()) + .withOptions(getOptions()) + .withBufferSize(getBufferSize()) + .withStatus(getStatus()); return new FSLinkResolver>() { @Override public CompletableFuture next( final AbstractFileSystem fs, final Path p) throws IOException { - return fs.openFileWithOptions(p, - getMandatoryKeys(), - getOptions(), - getBufferSize()); + return fs.openFileWithOptions(p, parameters); } }.resolve(FileContext.this, absF); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 46f885f27f5c7..5d34428de04fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -4485,43 +4486,39 @@ public FutureDataInputStreamBuilder openFile(PathHandle pathHandle) * the action of opening the file should begin. * * The base implementation performs a blocking - * call to {@link #open(Path, int)}in this call; + * call to {@link #open(Path, int)} in this call; * the actual outcome is in the returned {@code CompletableFuture}. * This avoids having to create some thread pool, while still * setting up the expectation that the {@code get()} call * is needed to evaluate the result. * @param path path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. - * @param bufferSize buffer size + * @param parameters open file parameters from the builder. * @return a future which will evaluate to the opened file. * @throws IOException failure to resolve the link. * @throws IllegalArgumentException unknown mandatory key */ protected CompletableFuture openFileWithOptions( final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), Collections.emptySet(), "for " + path); return LambdaUtils.eval( - new CompletableFuture<>(), () -> open(path, bufferSize)); + new CompletableFuture<>(), () -> + open(path, parameters.getBufferSize())); } /** * Execute the actual open file operation. * The base implementation performs a blocking - * call to {@link #open(Path, int)}in this call; + * call to {@link #open(Path, int)} in this call; * the actual outcome is in the returned {@code CompletableFuture}. * This avoids having to create some thread pool, while still * setting up the expectation that the {@code get()} call * is needed to evaluate the result. * @param pathHandle path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. - * @param bufferSize buffer size + * @param parameters open file parameters from the builder. * @return a future which will evaluate to the opened file. * @throws IOException failure to resolve the link. * @throws IllegalArgumentException unknown mandatory key @@ -4530,14 +4527,13 @@ protected CompletableFuture openFileWithOptions( */ protected CompletableFuture openFileWithOptions( final PathHandle pathHandle, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys, + final OpenFileParameters parameters) throws IOException { + AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( + parameters.getMandatoryKeys(), Collections.emptySet(), ""); CompletableFuture result = new CompletableFuture<>(); try { - result.complete(open(pathHandle, bufferSize)); + result.complete(open(pathHandle, parameters.getBufferSize())); } catch (UnsupportedOperationException tx) { // fail fast here throw tx; @@ -4639,12 +4635,17 @@ protected FSDataInputStreamBuilder( @Override public CompletableFuture build() throws IOException { Optional optionalPath = getOptionalPath(); + OpenFileParameters parameters = new OpenFileParameters() + .withMandatoryKeys(getMandatoryKeys()) + .withOptions(getOptions()) + .withBufferSize(getBufferSize()) + .withStatus(super.getStatus()); // explicit to avoid IDE warnings if(optionalPath.isPresent()) { return getFS().openFileWithOptions(optionalPath.get(), - getMandatoryKeys(), getOptions(), getBufferSize()); + parameters); } else { return getFS().openFileWithOptions(getPathHandle(), - getMandatoryKeys(), getOptions(), getBufferSize()); + parameters); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 3bc3cb2e9b07a..cf12ea3898a7f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -25,12 +25,12 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -714,20 +714,15 @@ public FutureDataInputStreamBuilder openFile(final PathHandle pathHandle) @Override protected CompletableFuture openFileWithOptions( final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - return fs.openFileWithOptions(path, mandatoryKeys, options, bufferSize); + final OpenFileParameters parameters) throws IOException { + return fs.openFileWithOptions(path, parameters); } @Override protected CompletableFuture openFileWithOptions( final PathHandle pathHandle, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - return fs.openFileWithOptions(pathHandle, mandatoryKeys, options, - bufferSize); + final OpenFileParameters parameters) throws IOException { + return fs.openFileWithOptions(pathHandle, parameters); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java index 731a52a7b4137..e197506edc88b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java @@ -26,13 +26,12 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -440,10 +439,8 @@ public Collection getAllStoragePolicies() @Override public CompletableFuture openFileWithOptions( final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { - return myFs.openFileWithOptions(path, mandatoryKeys, options, bufferSize); + final OpenFileParameters parameters) throws IOException { + return myFs.openFileWithOptions(path, parameters); } public boolean hasPathCapability(final Path path, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java index 774d30927df2c..27a522e593001 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FutureDataInputStreamBuilder.java @@ -47,4 +47,15 @@ public interface FutureDataInputStreamBuilder CompletableFuture build() throws IllegalArgumentException, UnsupportedOperationException, IOException; + + /** + * A FileStatus may be provided to the open request. + * It is up to the implementation whether to use this or not. + * @param status status. + * @return the builder. + */ + default FutureDataInputStreamBuilder withFileStatus(FileStatus status) { + return this; + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java index 2aa4a5d95fcc7..24a8d49747fe6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureDataInputStreamBuilderImpl.java @@ -26,12 +26,13 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathHandle; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; @@ -60,6 +61,12 @@ public abstract class FutureDataInputStreamBuilderImpl private int bufferSize; + /** + * File status passed in through a {@link #withFileStatus(FileStatus)} + * call; null otherwise. + */ + private FileStatus status; + /** * Construct from a {@link FileContext}. * @@ -69,8 +76,8 @@ public abstract class FutureDataInputStreamBuilderImpl */ protected FutureDataInputStreamBuilderImpl(@Nonnull FileContext fc, @Nonnull Path path) throws IOException { - super(checkNotNull(path)); - checkNotNull(fc); + super(requireNonNull(path, "path")); + requireNonNull(fc, "file context"); this.fileSystem = null; bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT; } @@ -82,8 +89,8 @@ protected FutureDataInputStreamBuilderImpl(@Nonnull FileContext fc, */ protected FutureDataInputStreamBuilderImpl(@Nonnull FileSystem fileSystem, @Nonnull Path path) { - super(checkNotNull(path)); - this.fileSystem = checkNotNull(fileSystem); + super(requireNonNull(path, "path")); + this.fileSystem = requireNonNull(fileSystem, "fileSystem"); initFromFS(); } @@ -108,7 +115,7 @@ private void initFromFS() { } protected FileSystem getFS() { - checkNotNull(fileSystem); + requireNonNull(fileSystem, "fileSystem"); return fileSystem; } @@ -138,4 +145,18 @@ public FutureDataInputStreamBuilder builder() { public FutureDataInputStreamBuilder getThisBuilder() { return this; } + + @Override + public FutureDataInputStreamBuilder withFileStatus(FileStatus st) { + this.status = requireNonNull(st, "status"); + return this; + } + + /** + * Get any status set in {@link #withFileStatus(FileStatus)}. + * @return a status value or null. + */ + protected FileStatus getStatus() { + return status; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java new file mode 100644 index 0000000000000..77b4ff52696a3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/OpenFileParameters.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; + +import static java.util.Objects.requireNonNull; + +/** + * All the parameters from the openFile builder for the + * {@code openFileWithOptions} commands. + * + * If/when new attributes added to the builder, this class will be extended. + */ +public class OpenFileParameters { + + /** + * Set of options declared as mandatory. + */ + private Set mandatoryKeys; + + /** + * Options set during the build sequence. + */ + private Configuration options; + + /** + * Buffer size. + */ + private int bufferSize; + + /** + * Optional file status. + */ + private FileStatus status; + + public OpenFileParameters() { + } + + public OpenFileParameters withMandatoryKeys(final Set keys) { + this.mandatoryKeys = requireNonNull(keys); + return this; + } + + public OpenFileParameters withOptions(final Configuration opts) { + this.options = requireNonNull(opts); + return this; + } + + public OpenFileParameters withBufferSize(final int size) { + this.bufferSize = size; + return this; + } + + public OpenFileParameters withStatus(final FileStatus st) { + this.status = st; + return this; + } + + public Set getMandatoryKeys() { + return mandatoryKeys; + } + + public Configuration getOptions() { + return options; + } + + public int getBufferSize() { + return bufferSize; + } + + public FileStatus getStatus() { + return status; + } +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index abbd2293fb2e9..07a48f9049f71 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -718,24 +718,29 @@ exists in the metadata, but no copies of any its blocks can be located; Creates a [`FSDataInputStreamBuilder`](fsdatainputstreambuilder.html) to construct a operation to open the file at `path` for reading. - When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance, the builder parameters are verified and -`openFileWithOptions(Path, Set, Configuration, int)` invoked. +`openFileWithOptions(Path, OpenFileParameters)` invoked. This (protected) operation returns a `CompletableFuture` which, when its `get()` method is called, either returns an input stream of the contents of opened file, or raises an exception. -The base implementation of the `openFileWithOptions(PathHandle, Set, Configuration, int)` +The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` ultimately invokes `open(Path, int)`. Thus the chain `openFile(path).build().get()` has the same preconditions and postconditions as `open(Path p, int bufferSize)` +However, there is one difference which implementations are free to +take advantage of: + +The returned stream MAY implement a lazy open where file non-existence or +access permission failures may not surface until the first `read()` of the +actual data. -The `openFile()` operation may check the state of the filesystem during this -call, but as the state of the filesystem may change betwen this call and +The `openFile()` operation may check the state of the filesystem during its +invocation, but as the state of the filesystem may change betwen this call and the actual `build()` and `get()` operations, this file-specific preconditions (file exists, file is readable, etc) MUST NOT be checked here. @@ -766,6 +771,10 @@ It SHOULD be possible to always open a file without specifying any options, so as to present a consistent model to users. However, an implementation MAY opt to require one or more mandatory options to be set. +The returned stream may perform "lazy" evaluation of file access. This is +relevant for object stores where the probes for existence are expensive, and, +even with an asynchronous open, may be considered needless. + ### `FSDataInputStreamBuilder openFile(PathHandle)` Creates a `FSDataInputStreamBuilder` to build an operation to open a file. @@ -774,13 +783,13 @@ to construct a operation to open the file identified by the given `PathHandle` f When `build()` is invoked on the returned `FSDataInputStreamBuilder` instance, the builder parameters are verified and -`openFileWithOptions(PathHandle, Set, Configuration, int)` invoked. +`openFileWithOptions(PathHandle, OpenFileParameters)` invoked. This (protected) operation returns a `CompletableFuture` which, when its `get()` method is called, either returns an input stream of the contents of opened file, or raises an exception. -The base implementation of the `openFileWithOptions(Path,PathHandle, Set, Configuration, int)` method +The base implementation of the `openFileWithOptions(PathHandle, OpenFileParameters)` method returns a future which invokes `open(Path, int)`. Thus the chain `openFile(pathhandle).build().get()` has the same preconditions diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md index a7c393d9a41c1..eadba174fc1a6 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md @@ -43,6 +43,31 @@ path validation. Set the size of the buffer to be used. +### `FSDataInputStreamBuilder withFileStatus(FileStatus status)` + +A `FileStatus` instance which refers to the file being opened. + +This MAY be used by implementations to short-circuit checks for the file, +So potentially saving on remote calls especially to object stores. + +Requirements: + +* `status != null` +* `status.getPath()` == the resolved path of the file being opened. + +The path validation MUST take place if the store uses the `FileStatus` when +it opens files, and MAY be performed otherwise. The validation +SHOULD be postponed until the `build()` operation. + +This operation should be considered a hint to the filesystem. + +If a filesystem implementation extends the `FileStatus` returned in its +implementation MAY use this information when opening the file. + +This is relevant with those stores which return version/etag information, +including the S3A and ABFS connectors -they MAY use this to guarantee that +the file they opened is exactly the one returned in the listing. + ### Set optional or mandatory parameters FSDataInputStreamBuilder opt(String key, ...) @@ -56,6 +81,7 @@ of `FileSystem`. out = fs.openFile(path) .opt("fs.s3a.experimental.input.fadvise", "random") .must("fs.s3a.readahead.range", 256 * 1024) + .withFileStatus(statusFromListing) .build() .get(); ``` @@ -76,6 +102,21 @@ builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows: > The last option specified defines the value and its optional/mandatory state. +If the `FileStatus` option passed in `withFileStatus()` is used, implementations +MUST accept all subclasses of `FileStatus`, including `LocatedFileStatus`, +rather than just any FS-specific subclass implemented by the implementation +(e.g `S3AFileStatus`). They MAY simply ignore those which are not the +custom subclasses. + +This is critical to ensure safe use of the feature: directory listing/ +status serialization/deserialization can result result in the `withFileStatus()` +argumennt not being the custom subclass returned by the Filesystem instance's +own `getFileStatus()`, `listFiles()`, `listLocatedStatus()` calls, etc. + +In such a situation the implementations must: + +1. Validate the path (always). +1. Use the status/convert to the custom type, *or* simply discard it. ## Builder interface diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index b6e94a664165e..a43053180fbf8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -281,6 +281,7 @@ public void testOpenFileApplyRead() throws Throwable { createFile(fs, path, true, dataset(len, 0x40, 0x80)); CompletableFuture readAllBytes = fs.openFile(path) + .withFileStatus(fs.getFileStatus(path)) .build() .thenApply(ContractTestUtils::readStream); assertEquals("Wrong number of bytes read value", @@ -302,4 +303,12 @@ public void testOpenFileApplyAsyncRead() throws Throwable { accepted.get()); } + @Test + public void testOpenFileNullStatus() throws Throwable { + describe("use openFile() with a null status"); + Path path = path("testOpenFileNullStatus"); + intercept(NullPointerException.class, + () -> getFileSystem().openFile(path).withFileStatus(null)); + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 615be2cfbdb95..7b6117283bc2f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -95,6 +95,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Globber; +import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.s3a.auth.SignerManager; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; @@ -978,27 +979,30 @@ protected URI canonicalizeUri(URI rawUri) { @Retries.RetryTranslated public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return open(f, Optional.empty()); + return open(f, Optional.empty(), Optional.empty()); } /** * Opens an FSDataInputStream at the indicated Path. - * @param path the file to open + * if status contains an S3AFileStatus reference, it is used + * and so a HEAD request to the store is avoided. + * + * @param file the file to open * @param options configuration options if opened with the builder API. + * @param providedStatus optional file status. * @throws IOException IO failure. */ @Retries.RetryTranslated private FSDataInputStream open( - final Path path, - final Optional options) + final Path file, + final Optional options, + final Optional providedStatus) throws IOException { entryPoint(INVOCATION_OPEN); - final S3AFileStatus fileStatus = (S3AFileStatus) getFileStatus(path); - if (fileStatus.isDirectory()) { - throw new FileNotFoundException("Can't open " + path - + " because it is a directory"); - } + final Path path = qualify(file); + S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, + providedStatus); S3AReadOpContext readContext; if (options.isPresent()) { @@ -4303,22 +4307,21 @@ protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) { * @param source path to source data * @param expression select expression * @param options request configuration from the builder. + * @param providedStatus any passed in status * @return the stream of the results * @throws IOException IO failure */ @Retries.RetryTranslated private FSDataInputStream select(final Path source, final String expression, - final Configuration options) + final Configuration options, + final Optional providedStatus) throws IOException { entryPoint(OBJECT_SELECT_REQUESTS); requireSelectSupport(source); final Path path = makeQualified(source); - // call getFileStatus(), which will look at S3Guard first, - // so the operation will fail if it is not there or S3Guard believes it has - // been deleted. - // validation of the file status are delegated to the binding. - final S3AFileStatus fileStatus = (S3AFileStatus) getFileStatus(path); + final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, + providedStatus); // readahead range can be dynamically set long ra = options.getLong(READAHEAD_RANGE, readAhead); @@ -4326,10 +4329,16 @@ private FSDataInputStream select(final Path source, S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy, changeDetectionPolicy, ra); - if (!fileStatus.isDirectory()) { + if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None + && fileStatus.getETag() != null) { + // if there is change detection, and the status includes at least an + // etag, // check that the object metadata lines up with what is expected // based on the object attributes (which may contain an eTag or - // versionId) from S3Guard + // versionId). + // This is because the select API doesn't offer this. + // (note: this is trouble for version checking as cannot force the old + // version in the final read; nor can we check the etag match) ChangeTracker changeTracker = new ChangeTracker(uri.toString(), changeDetectionPolicy, @@ -4364,12 +4373,42 @@ private void requireSelectSupport(final Path source) throws } } + /** + * Extract the status from the optional parameter, querying + * S3Guard/s3 if it is absent. + * @param path path of the status + * @param optStatus optional status + * @return a file status + * @throws FileNotFoundException if there is no normal file at that path + * @throws IOException IO failure + */ + private S3AFileStatus extractOrFetchSimpleFileStatus( + final Path path, final Optional optStatus) + throws IOException { + S3AFileStatus fileStatus; + if (optStatus.isPresent()) { + fileStatus = optStatus.get(); + } else { + // this looks at S3guard and gets any type of status back, + // if it falls back to S3 it does a HEAD only. + // therefore: if there is no S3Guard and there is a dir, this + // will raise a FileNotFoundException + fileStatus = innerGetFileStatus(path, false, + StatusProbeEnum.HEAD_ONLY); + } + // we check here for the passed in status or the S3Guard value + // for being a directory + if (fileStatus.isDirectory()) { + throw new FileNotFoundException(path.toString() + " is a directory"); + } + return fileStatus; + } + /** * Initiate the open or select operation. * This is invoked from both the FileSystem and FileContext APIs - * @param path path to the file - * @param mandatoryKeys set of options declared as mandatory. - * @param options options set during the build sequence. + * @param rawPath path to the file + * @param parameters open file parameters from the builder. * @return a future which will evaluate to the opened/selected file. * @throws IOException failure to resolve the link. * @throws PathIOException operation is a select request but S3 select is @@ -4379,10 +4418,11 @@ private void requireSelectSupport(final Path source) throws @Override @Retries.RetryTranslated public CompletableFuture openFileWithOptions( - final Path path, - final Set mandatoryKeys, - final Configuration options, - final int bufferSize) throws IOException { + final Path rawPath, + final OpenFileParameters parameters) throws IOException { + final Path path = qualify(rawPath); + Configuration options = parameters.getOptions(); + Set mandatoryKeys = parameters.getMandatoryKeys(); String sql = options.get(SelectConstants.SELECT_SQL, null); boolean isSelect = sql != null; // choice of keys depends on open type @@ -4397,20 +4437,46 @@ public CompletableFuture openFileWithOptions( InternalConstants.STANDARD_OPENFILE_KEYS, "for " + path + " in non-select file I/O"); } + FileStatus providedStatus = parameters.getStatus(); + S3AFileStatus fileStatus; + if (providedStatus != null) { + Preconditions.checkArgument(path.equals(providedStatus.getPath()), + "FileStatus parameter is not for the path %s: %s", + path, providedStatus); + if (providedStatus instanceof S3AFileStatus) { + // can use this status to skip our own probes, + // including etag and version. + LOG.debug("File was opened with a supplied S3AFileStatus;" + + " skipping getFileStatus call in open() operation: {}", + providedStatus); + fileStatus = (S3AFileStatus) providedStatus; + } else if (providedStatus instanceof S3ALocatedFileStatus) { + LOG.debug("File was opened with a supplied S3ALocatedFileStatus;" + + " skipping getFileStatus call in open() operation: {}", + providedStatus); + fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus(); + } else { + LOG.debug("Ignoring file status {}", providedStatus); + fileStatus = null; + } + } else { + fileStatus = null; + } + Optional ost = Optional.ofNullable(fileStatus); CompletableFuture result = new CompletableFuture<>(); if (!isSelect) { // normal path. unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> open(path, Optional.of(options)))); + () -> open(path, Optional.of(options), ost))); } else { // it is a select statement. - // fail fast if the method is not present + // fail fast if the operation is not available requireSelectSupport(path); // submit the query unboundedThreadPool.submit(() -> LambdaUtils.eval(result, - () -> select(path, sql, options))); + () -> select(path, sql, options, ost))); } return result; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java index 20f25f26c5979..3fd70be931997 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java @@ -20,7 +20,7 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.Optional; @@ -432,6 +432,106 @@ public void testReadFileChangedOutOfSyncMetadata() throws Throwable { } } + /** + * Verifies that when the openFile builder is passed in a status, + * then that is used to eliminate the getFileStatus call in open(); + * thus the version and etag passed down are still used. + */ + @Test + public void testOpenFileWithStatus() throws Throwable { + final Path testpath = path("testOpenFileWithStatus.dat"); + final byte[] dataset = TEST_DATA_BYTES; + S3AFileStatus originalStatus = + writeFile(testpath, dataset, dataset.length, true); + + // forge a file status with a different etag + // no attempt is made to change the versionID as it will + // get rejected by S3 as an invalid version + S3AFileStatus forgedStatus = + S3AFileStatus.fromFileStatus(originalStatus, Tristate.FALSE, + originalStatus.getETag() + "-fake", + originalStatus.getVersionId() + ""); + fs.getMetadataStore().put( + new PathMetadata(forgedStatus, Tristate.FALSE, false)); + + // verify the bad etag gets picked up. + LOG.info("Opening stream with s3guard's (invalid) status."); + try (FSDataInputStream instream = fs.openFile(testpath) + .build() + .get()) { + try { + instream.read(); + // No exception only if we don't enforce change detection as exception + assertTrue( + "Read did not raise an exception even though the change detection " + + "mode was " + changeDetectionMode + + " and the inserted file status was invalid", + changeDetectionMode.equals(CHANGE_DETECT_MODE_NONE) + || changeDetectionMode.equals(CHANGE_DETECT_MODE_WARN) + || changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID)); + } catch (RemoteFileChangedException ignored) { + // Ignored. + } + } + + // By passing in the status open() doesn't need to check s3guard + // And hence the existing file is opened + LOG.info("Opening stream with the original status."); + try (FSDataInputStream instream = fs.openFile(testpath) + .withFileStatus(originalStatus) + .build() + .get()) { + instream.read(); + } + + // and this holds for S3A Located Status + LOG.info("Opening stream with S3ALocatedFileStatus."); + try (FSDataInputStream instream = fs.openFile(testpath) + .withFileStatus(new S3ALocatedFileStatus(originalStatus, null)) + .build() + .get()) { + instream.read(); + } + + // if you pass in a status of a dir, it will be rejected + S3AFileStatus s2 = new S3AFileStatus(true, testpath, "alice"); + assertTrue("not a directory " + s2, s2.isDirectory()); + LOG.info("Open with directory status"); + interceptFuture(FileNotFoundException.class, "", + fs.openFile(testpath) + .withFileStatus(s2) + .build()); + + // now, we delete the file from the store and s3guard + // when we pass in the status, there's no HEAD request, so it's only + // in the read call where the 404 surfaces. + // and there, when versionID is passed to the GET, the data is returned + LOG.info("Testing opening a deleted file"); + fs.delete(testpath, false); + try (FSDataInputStream instream = fs.openFile(testpath) + .withFileStatus(originalStatus) + .build() + .get()) { + if (changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID) + && changeDetectionMode.equals(CHANGE_DETECT_MODE_SERVER)) { + // the deleted file is still there if you know the version ID + // and the check is server-side + instream.read(); + } else { + // all other cases, the read will return 404. + intercept(FileNotFoundException.class, + () -> instream.read()); + } + + } + + // whereas without that status, you fail in the get() when a HEAD is + // issued + interceptFuture(FileNotFoundException.class, "", + fs.openFile(testpath).build()); + + } + /** * Ensures a file can be read when there is no version metadata * (ETag, versionId). @@ -524,9 +624,11 @@ public void testSelectWithNoVersionMetadata() throws Throwable { writeFileWithNoVersionMetadata("selectnoversion.dat"); try (FSDataInputStream instream = fs.openFile(testpath) - .must(SELECT_SQL, "SELECT * FROM S3OBJECT").build().get()) { + .must(SELECT_SQL, "SELECT * FROM S3OBJECT") + .build() + .get()) { assertEquals(QUOTED_TEST_DATA, - IOUtils.toString(instream, Charset.forName("UTF-8")).trim()); + IOUtils.toString(instream, StandardCharsets.UTF_8).trim()); } } @@ -902,15 +1004,12 @@ public void testEventuallyConsistentReadOnReopen() throws Throwable { private Path writeOutOfSyncFileVersion(String filename) throws IOException { final Path testpath = path(filename); final byte[] dataset = TEST_DATA_BYTES; - writeDataset(fs, testpath, dataset, dataset.length, - 1024, false); - S3AFileStatus originalStatus = (S3AFileStatus) fs.getFileStatus(testpath); + S3AFileStatus originalStatus = + writeFile(testpath, dataset, dataset.length, false); // overwrite with half the content - writeDataset(fs, testpath, dataset, dataset.length / 2, - 1024, true); - - S3AFileStatus newStatus = (S3AFileStatus) fs.getFileStatus(testpath); + S3AFileStatus newStatus = writeFile(testpath, dataset, dataset.length / 2, + true); // put back the original etag, versionId S3AFileStatus forgedStatus = @@ -922,6 +1021,23 @@ private Path writeOutOfSyncFileVersion(String filename) throws IOException { return testpath; } + /** + * Write data to a file; return the status from the filesystem. + * @param path file path + * @param dataset dataset to write from + * @param length number of bytes from the dataset to write. + * @param overwrite overwrite flag + * @return the retrieved file status. + */ + private S3AFileStatus writeFile(final Path path, + final byte[] dataset, + final int length, + final boolean overwrite) throws IOException { + writeDataset(fs, path, dataset, length, + 1024, overwrite); + return (S3AFileStatus) fs.getFileStatus(path); + } + /** * Writes {@link #TEST_DATA} to a file where the file will be inconsistent * in S3 for a set of operations. @@ -1208,9 +1324,8 @@ public ObjectMetadata answer(InvocationOnMock invocation) private Path writeFileWithNoVersionMetadata(String filename) throws IOException { final Path testpath = path(filename); - writeDataset(fs, testpath, TEST_DATA_BYTES, TEST_DATA_BYTES.length, - 1024, false); - S3AFileStatus originalStatus = (S3AFileStatus) fs.getFileStatus(testpath); + S3AFileStatus originalStatus = writeFile(testpath, TEST_DATA_BYTES, + TEST_DATA_BYTES.length, false); // remove ETag and versionId S3AFileStatus newStatus = S3AFileStatus.fromFileStatus(originalStatus, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java index 96f913cdb0901..3cdb4e6eeee3b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java @@ -28,13 +28,13 @@ import java.util.stream.Collectors; import org.assertj.core.api.Assertions; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; @@ -60,15 +59,17 @@ import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL; import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL; import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_INTERVAL; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT; import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; import static org.apache.hadoop.fs.s3a.S3ATestUtils.PROBE_INTERVAL_MILLIS; import static org.apache.hadoop.fs.s3a.S3ATestUtils.STABILIZATION_TIME; import static org.apache.hadoop.fs.s3a.S3ATestUtils.TIMESTAMP_SLEEP; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitDeletedFileDisappearance; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.awaitFileStatus; import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingContainsPath; import static org.apache.hadoop.fs.s3a.S3ATestUtils.checkListingDoesNotContainPath; import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.read; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.readWithStatus; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.test.LambdaTestUtils.eventually; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -163,9 +164,14 @@ protected Configuration createConfiguration() { // speeding up the tests removeBaseAndBucketOverrides(conf, RETRY_LIMIT, - RETRY_INTERVAL); + RETRY_INTERVAL, + S3GUARD_CONSISTENCY_RETRY_INTERVAL, + S3GUARD_CONSISTENCY_RETRY_LIMIT); conf.setInt(RETRY_LIMIT, 3); - conf.set(RETRY_INTERVAL, "10ms"); + conf.setInt(S3GUARD_CONSISTENCY_RETRY_LIMIT, 3); + final String delay = "10ms"; + conf.set(RETRY_INTERVAL, delay); + conf.set(S3GUARD_CONSISTENCY_RETRY_INTERVAL, delay); return conf; } @@ -284,11 +290,6 @@ public void testLongerLengthOverwrite() throws Exception { @Test public void testOutOfBandDeletes() throws Exception { - ChangeDetectionPolicy changeDetectionPolicy = - ((S3AFileSystem) getFileSystem()).getChangeDetectionPolicy(); - Assume.assumeFalse("FNF not expected when using a bucket with" - + " object versioning", - changeDetectionPolicy.getSource() == Source.VersionId); Path testFileName = path("OutOfBandDelete-" + UUID.randomUUID()); outOfBandDeletes(testFileName, authoritative); @@ -658,8 +659,22 @@ private void outOfBandDeletes( FileStatus status = guardedFs.getFileStatus(testFilePath); LOG.info("Authoritative: {} status path: {}", allowAuthoritative, status.getPath()); - expectExceptionWhenReading(testFilePath, text); - expectExceptionWhenReadingOpenFileAPI(testFilePath, text); + final boolean versionedChangeDetection = + getFileSystem().getChangeDetectionPolicy().getSource() + == Source.VersionId; + if (!versionedChangeDetection) { + expectExceptionWhenReading(testFilePath, text); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, status); + } else { + // FNFE not expected when using a bucket with object versioning + final String read1 = read(guardedFs, testFilePath); + assertEquals("File read from the auth FS", text, read1); + // and when the status is passed in, even the raw FS will ask for it + // via the versionId in the status + final String read2 = readWithStatus(rawFS, status); + assertEquals("File read from the raw FS", text, read2); + } } finally { guardedFs.delete(testFilePath, true); } @@ -957,7 +972,8 @@ private void deleteFileInListing() FileStatus status = guardedFs.getFileStatus(testFilePath); LOG.info("authoritative: {} status: {}", allowAuthoritative, status); expectExceptionWhenReading(testFilePath, text); - expectExceptionWhenReadingOpenFileAPI(testFilePath, text); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, null); + expectExceptionWhenReadingOpenFileAPI(testFilePath, text, status); } finally { guardedFs.delete(testDirPath, true); } @@ -983,14 +999,18 @@ private void expectExceptionWhenReading(Path testFilePath, String text) * We expect the read to fail with an FNFE: open will be happy. * @param testFilePath path of the test file * @param text the context in the file. + * @param status optional status for the withFileStatus operation. * @throws Exception failure other than the FNFE */ private void expectExceptionWhenReadingOpenFileAPI( - Path testFilePath, String text) + Path testFilePath, String text, FileStatus status) throws Exception { - try ( - FSDataInputStream in = guardedFs.openFile(testFilePath).build().get() - ) { + final FutureDataInputStreamBuilder builder + = guardedFs.openFile(testFilePath); + if (status != null) { + builder.withFileStatus(status); + } + try (FSDataInputStream in = builder.build().get()) { intercept(FileNotFoundException.class, () -> { byte[] bytes = new byte[text.length()]; return in.read(bytes, 0, bytes.length); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index db871a5f9177a..f75e37ecaaf37 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -66,12 +66,14 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture; import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -1184,6 +1186,31 @@ public static String read(FileSystem fs, } } + /** + * Read in a file and convert to an ascii string, using the openFile + * builder API and the file status. + * If the status is an S3A FileStatus, any etag or versionId used + * will be picked up. + * @param fs filesystem + * @param status file status, including path + * @return the bytes read and converted to a string + * @throws IOException IO problems + */ + public static String readWithStatus( + final FileSystem fs, + final FileStatus status) throws IOException { + final CompletableFuture future = + fs.openFile(status.getPath()) + .withFileStatus(status) + .build(); + + try (FSDataInputStream in = awaitFuture(future)) { + byte[] buf = new byte[(int) status.getLen()]; + in.readFully(0, buf); + return new String(buf); + } + } + /** * List a directory/directory tree. * @param fileSystem FS diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java index d6058d19521be..64974db5a466c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/select/ITestS3Select.java @@ -256,6 +256,7 @@ public void testSelectEmptyFile() throws Throwable { ContractTestUtils.touch(fs, path); parseToLines(fs.openFile(path) .must(SELECT_SQL, SELECT_EVERYTHING) + .withFileStatus(fs.getFileStatus(path)) .build() .get(), 0); @@ -548,14 +549,14 @@ public void testSelectDirectoryFails() throws Throwable { FutureDataInputStreamBuilder builder = getFileSystem().openFile(dir) .must(SELECT_SQL, SELECT_ODD_ENTRIES); - interceptFuture(PathIOException.class, + interceptFuture(FileNotFoundException.class, "", builder.build()); // try the parent builder = getFileSystem().openFile(dir.getParent()) .must(SELECT_SQL, SELECT_ODD_ENTRIES); - interceptFuture(PathIOException.class, + interceptFuture(FileNotFoundException.class, "", builder.build()); } @@ -565,7 +566,7 @@ public void testSelectRootFails() throws Throwable { FutureDataInputStreamBuilder builder = getFileSystem().openFile(path("/")) .must(SELECT_SQL, SELECT_ODD_ENTRIES); - interceptFuture(PathIOException.class, + interceptFuture(FileNotFoundException.class, "", builder.build()); }