Skip to content

Commit

Permalink
HADOOP-16759 address Mingliang's comments
Browse files Browse the repository at this point in the history
* approximate builder API for openFileParameters

* S3AFS moves to Optional<S3AFileStatus> where appropriate,
which includes the select() operation too. There's a common method
to do the extraction.

* Switch to objects.requireNonNull in stream builder / openFileParameters
  plus error text (NPE debugging...)

Extra tests
 * verify pickup of invalid status when none is supplied
 * rejection of a status declaring the source is a directory
 * after an object is deleted, the 404 isn't picked up until the read() call
   initiates the GET.
 * And there, if you use server-side versionID, *you get the file still*
 * +contract test to verify that withFileStatus(null) status value is rejected

Change-Id: I326f68538940f245e1af56d3b6055015fd3e1bfe
  • Loading branch information
steveloughran committed Jan 10, 2020
1 parent 19c9d4a commit 6a1a951
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4612,7 +4612,7 @@ public CompletableFuture<FSDataInputStream> build() throws IOException {
.withMandatoryKeys(getMandatoryKeys())
.withOptions(getOptions())
.withBufferSize(getBufferSize())
.withStatus(getStatus());
.withStatus(super.getStatus()); // explicit so as to avoid IDE warnings
if(optionalPath.isPresent()) {
return getFS().openFileWithOptions(optionalPath.get(),
parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;

Expand Down Expand Up @@ -76,8 +76,8 @@ public abstract class FutureDataInputStreamBuilderImpl
*/
protected FutureDataInputStreamBuilderImpl(@Nonnull FileContext fc,
@Nonnull Path path) throws IOException {
super(checkNotNull(path));
checkNotNull(fc);
super(requireNonNull(path, "path"));
requireNonNull(fc, "file context");
this.fileSystem = null;
bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
}
Expand All @@ -89,8 +89,8 @@ protected FutureDataInputStreamBuilderImpl(@Nonnull FileContext fc,
*/
protected FutureDataInputStreamBuilderImpl(@Nonnull FileSystem fileSystem,
@Nonnull Path path) {
super(checkNotNull(path));
this.fileSystem = checkNotNull(fileSystem);
super(requireNonNull(path, "path"));
this.fileSystem = requireNonNull(fileSystem, "fileSystem");
initFromFS();
}

Expand All @@ -115,7 +115,7 @@ private void initFromFS() {
}

protected FileSystem getFS() {
checkNotNull(fileSystem);
requireNonNull(fileSystem, "fileSystem");
return fileSystem;
}

Expand Down Expand Up @@ -148,7 +148,7 @@ public FutureDataInputStreamBuilder getThisBuilder() {

@Override
public FutureDataInputStreamBuilder withFileStatus(FileStatus status) {
this.status = checkNotNull(status);
this.status = requireNonNull(status, "status");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;

import static java.util.Objects.requireNonNull;

/**
* All the parameters from the openFile builder for the {@code openFileWithOptions} commands.
* All the parameters from the openFile builder for the
* {@code openFileWithOptions} commands.
*
* If/when new attributes added to the builder, this class will be extended.
*/
Expand Down Expand Up @@ -54,12 +57,12 @@ public OpenFileParameters() {
}

public OpenFileParameters withMandatoryKeys(final Set<String> mandatoryKeys) {
this.mandatoryKeys = mandatoryKeys;
this.mandatoryKeys = requireNonNull(mandatoryKeys);
return this;
}

public OpenFileParameters withOptions(final Configuration options) {
this.options = options;
this.options = requireNonNull(options);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4372,23 +4372,35 @@ private void requireSelectSupport(final Path source) throws
}

/**
* Extract the status from the optional parameter, falling
* back to a HEAD for the file only (not the directory).
* Extract the status from the optional parameter, querying
* S3Guard/s3 if it is absent.
* @param path path of the status
* @param status optional status
* @param optStatus optional status
* @return a file status
* @throws FileNotFoundException if there is no normal file at that path
* @throws IOException IO failure
*/
private S3AFileStatus extractOrFetchSimpleFileStatus(
final Path path, final Optional<S3AFileStatus> status)
final Path path, final Optional<S3AFileStatus> optStatus)
throws IOException {
if (status.isPresent()) {
return status.get();
S3AFileStatus fileStatus;
if (optStatus.isPresent()) {
fileStatus = optStatus.get();
} else {
return innerGetFileStatus(path, false,
// this looks at s3guard and gets any type of status back,
// if it falls back to S3 it does a HEAD only.
// therefore: if there is no S3Guard and there is a dir, this
// will fail
fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.HEAD_ONLY);
}
// we check here for the passed in status or the S3Guard value
// for being a directory
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + path
+ " because it is a directory");
}
return fileStatus;
}

/**
Expand All @@ -4408,7 +4420,6 @@ public CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path rawPath,
final OpenFileParameters parameters) throws IOException {
final Path path = qualify(rawPath);

Configuration options = parameters.getOptions();
Set<String> mandatoryKeys = parameters.getMandatoryKeys();
String sql = options.get(SelectConstants.SELECT_SQL, null);
Expand All @@ -4426,7 +4437,7 @@ public CompletableFuture<FSDataInputStream> openFileWithOptions(
"for " + path + " in non-select file I/O");
}
FileStatus providedStatus = parameters.getStatus();
S3AFileStatus fileStatus = null;
S3AFileStatus fileStatus;
if (providedStatus != null) {
Preconditions.checkArgument(path.equals(providedStatus.getPath()),
"FileStatus parameter is not for the path %s: %s",
Expand All @@ -4445,9 +4456,12 @@ public CompletableFuture<FSDataInputStream> openFileWithOptions(
fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus();
} else {
LOG.debug("Ignoring file status {}", providedStatus);
fileStatus = null;
}
} else {
fileStatus = null;
}
Optional<S3AFileStatus> ost = Optional.of(fileStatus);
Optional<S3AFileStatus> ost = Optional.ofNullable(fileStatus);
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
if (!isSelect) {
// normal path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
Expand Down Expand Up @@ -444,39 +444,93 @@ public void testOpenFileWithStatus() throws Throwable {
S3AFileStatus originalStatus =
writeFile(testpath, dataset, dataset.length, true);

// forge a file status with a different tag
// forge a file status with a different etag
// no attempt is made to change the versionID as it will
// get rejected by S3 as an invalid version
S3AFileStatus forgedStatus =
S3AFileStatus.fromFileStatus(originalStatus, Tristate.FALSE,
originalStatus.getETag() + "-fake",
originalStatus.getVersionId() + "");
fs.getMetadataStore().put(
new PathMetadata(forgedStatus, Tristate.FALSE, false));

// verify the bad etag gets picked up...which it does for etag but not
// version ID
LOG.info("Opening stream with s3guard's (invalid) status.");
try (FSDataInputStream instream = fs.openFile(testpath)
.build()
.get()) {
try {
instream.read();
// No exception only if we don't enforce change detection as exception
assertTrue(
"Read did not raise an exception even though the change detection "
+ "mode was " + changeDetectionMode
+ " and the inserted file status was invalid",
changeDetectionMode.equals(CHANGE_DETECT_MODE_NONE) ||
changeDetectionMode.equals(CHANGE_DETECT_MODE_WARN)
|| changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID));
} catch (Exception ignored) {
// Ignored.
}
}

// By passing in the status open() doesn't need to check s3guard
// And hence the existing file is opened
LOG.info("Opening stream with the original status.");
try (FSDataInputStream instream = fs.openFile(testpath)
.withFileStatus(originalStatus)
.build().get()) {
.build()
.get()) {
instream.read();
}

// and this holds for S3A Located Status
LOG.info("Opening stream with S3ALocatedFileStatus.");
try (FSDataInputStream instream = fs.openFile(testpath)
.withFileStatus(new S3ALocatedFileStatus(originalStatus, null))
.build().get()) {
.build()
.get()) {
instream.read();
}

// if you pass in a status of a dir, it will be rejected
S3AFileStatus s2 = new S3AFileStatus(true, testpath, "alice");
assertTrue("not a directory " + s2, s2.isDirectory());
LOG.info("Open with directory status");
interceptFuture(FileNotFoundException.class, "",
fs.openFile(testpath)
.withFileStatus(s2)
.build());

// now, we delete the file from the store and s3guard
// when we pass in the status, there's no HEAD request, so it's only
// in the read call where the 404 surfaces.
// and there, when versionID is passed to the GET, the data is returned
LOG.info("Testing opening a deleted file");
fs.delete(testpath, false);
try (FSDataInputStream instream = fs.openFile(testpath)
.build().get()) {
try {
instream.read();
// No exception only if we don't enforce change detection as exception
assertTrue(changeDetectionMode.equals(CHANGE_DETECT_MODE_NONE) ||
changeDetectionMode.equals(CHANGE_DETECT_MODE_WARN));
} catch (Exception ignored) {
// Ignored.
.withFileStatus(originalStatus)
.build()
.get()) {
if (changeDetectionSource.equals(CHANGE_DETECT_SOURCE_VERSION_ID)
&& changeDetectionMode.equals(CHANGE_DETECT_MODE_SERVER)) {
// the deleted file is still there if you know the version ID
// and the check is server-side
instream.read();
} else {
// all other cases, the read will return 404.
intercept(FileNotFoundException.class,
() -> instream.read());
}

}

// whereas without that status, you fail in the get() when a HEAD is
// issued
interceptFuture(FileNotFoundException.class, "",
fs.openFile(testpath).build());

}

/**
Expand Down Expand Up @@ -571,9 +625,11 @@ public void testSelectWithNoVersionMetadata() throws Throwable {
writeFileWithNoVersionMetadata("selectnoversion.dat");

try (FSDataInputStream instream = fs.openFile(testpath)
.must(SELECT_SQL, "SELECT * FROM S3OBJECT").build().get()) {
.must(SELECT_SQL, "SELECT * FROM S3OBJECT")
.build()
.get()) {
assertEquals(QUOTED_TEST_DATA,
IOUtils.toString(instream, Charset.forName("UTF-8")).trim());
IOUtils.toString(instream, StandardCharsets.UTF_8).trim());
}
}

Expand Down

0 comments on commit 6a1a951

Please sign in to comment.