Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Hive DirectoryLister to TrinoFileSystem #17323

Merged
merged 5 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
public final class Location
{
private static final Splitter SCHEME_SPLITTER = Splitter.on("://").limit(2);
private static final Splitter SCHEME_SPLITTER = Splitter.on(":").limit(2);
private static final Splitter USER_INFO_SPLITTER = Splitter.on('@').limit(2);
private static final Splitter AUTHORITY_SPLITTER = Splitter.on('/').limit(2);
private static final Splitter HOST_AND_PORT_SPLITTER = Splitter.on(':').limit(2);
Expand All @@ -70,38 +70,39 @@ public static Location of(String location)
return new Location(location, Optional.empty(), Optional.empty(), Optional.empty(), OptionalInt.empty(), location.substring(1));
}

// local file system location
if (location.startsWith("file:/") && ((location.length() == 6) || (location.charAt(6) != '/'))) {
return new Location(location, Optional.of("file"), Optional.empty(), Optional.empty(), OptionalInt.empty(), location.substring(6));
}

List<String> schemeSplit = SCHEME_SPLITTER.splitToList(location);
checkArgument(schemeSplit.size() == 2, "No scheme for file system location: %s", location);
String scheme = schemeSplit.get(0);

String afterScheme = schemeSplit.get(1);
if (afterScheme.startsWith("//")) {
// Locations with an authority must begin with a double slash
afterScheme = afterScheme.substring(2);
List<String> userInfoSplit = USER_INFO_SPLITTER.splitToList(afterScheme);
Optional<String> userInfo = userInfoSplit.size() == 2 ? Optional.of(userInfoSplit.get(0)) : Optional.empty();

List<String> authoritySplit = AUTHORITY_SPLITTER.splitToList(Iterables.getLast(userInfoSplit));
List<String> hostAndPortSplit = HOST_AND_PORT_SPLITTER.splitToList(authoritySplit.get(0));

Optional<String> host = Optional.of(hostAndPortSplit.get(0)).filter(not(String::isEmpty));

OptionalInt port = OptionalInt.empty();
if (hostAndPortSplit.size() == 2) {
try {
port = OptionalInt.of(parseInt(hostAndPortSplit.get(1)));
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid port in file system location: " + location, e);
}
}

List<String> userInfoSplit = USER_INFO_SPLITTER.splitToList(afterScheme);
Optional<String> userInfo = userInfoSplit.size() == 2 ? Optional.of(userInfoSplit.get(0)) : Optional.empty();

List<String> authoritySplit = AUTHORITY_SPLITTER.splitToList(Iterables.getLast(userInfoSplit));
List<String> hostAndPortSplit = HOST_AND_PORT_SPLITTER.splitToList(authoritySplit.get(0));

Optional<String> host = Optional.of(hostAndPortSplit.get(0)).filter(not(String::isEmpty));
String path = (authoritySplit.size() == 2) ? authoritySplit.get(1) : "";

OptionalInt port = OptionalInt.empty();
if (hostAndPortSplit.size() == 2) {
try {
port = OptionalInt.of(parseInt(hostAndPortSplit.get(1)));
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid port in file system location: " + location, e);
}
return new Location(location, Optional.of(scheme), userInfo, host, port, path);
}

String path = (authoritySplit.size() == 2) ? authoritySplit.get(1) : "";

return new Location(location, Optional.of(scheme), userInfo, host, port, path);
checkArgument(afterScheme.startsWith("/"), "Path must begin with a '/' when no authority is present");
return new Location(location, Optional.of(scheme), Optional.empty(), Optional.empty(), OptionalInt.empty(), afterScheme.substring(1));
}

private Location(String location, Optional<String> scheme, Optional<String> userInfo, Optional<String> host, OptionalInt port, String path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Optional;

/**
* TrinoFileSystem is the main abstraction for Trino to interact with data in cloud-like storage
Expand Down Expand Up @@ -150,4 +151,14 @@ void renameFile(Location source, Location target)
*/
FileIterator listFiles(Location location)
throws IOException;

/**
* Checks if a directory exists at the specified location. For non-hierarchical file systems
* an empty Optional is returned.
*
* @param location the location to check for a directory
* @throws IOException if the location is not valid for this file system
*/
Optional<Boolean> directoryExists(Location location)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ public FileIterator listFiles(Location location)
return new LocalFileIterator(location, rootPath, toDirectoryPath(location));
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
{
return Optional.of(Files.isDirectory(toFilePath(location)));
}

private Path toFilePath(Location location)
{
validateLocalLocation(location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public FileEntry next()
};
}

@Override
public Optional<Boolean> directoryExists(Location location)
{
return Optional.empty();
}

private static String toBlobKey(Location location)
{
validateMemoryLocation(location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,14 @@ public FileIterator listFiles(Location location)
.startSpan();
return withTracing(span, () -> delegate.listFiles(location));
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
{
Span span = tracer.spanBuilder("FileSystem.directoryExists")
.setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString())
.startSpan();
return withTracing(span, () -> delegate.directoryExists(location));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,29 @@ void testListFiles()
}
}

@Test
public void testDirectoryExists()
throws IOException
{
try (Closer closer = Closer.create()) {
String directoryName = "testDirectoryExistsDir";
String fileName = "file.csv";
createBlob(closer, createLocation(directoryName).appendPath(fileName).path());
TrinoFileSystem fileSystem = getFileSystem();

if (isHierarchical()) {
assertThat(fileSystem.directoryExists(createLocation(directoryName))).contains(true);
assertThat(fileSystem.directoryExists(createLocation(UUID.randomUUID().toString()))).contains(false);
assertThat(fileSystem.directoryExists(createLocation(directoryName).appendPath(fileName))).contains(false);
}
else {
assertThat(fileSystem.directoryExists(createLocation(directoryName))).isEmpty();
assertThat(fileSystem.directoryExists(createLocation(UUID.randomUUID().toString()))).isEmpty();
assertThat(fileSystem.directoryExists(createLocation(directoryName).appendPath(fileName))).isEmpty();
}
}
}

private List<Location> listPath(String path)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ void testParse()
assertLocation("/abc/xyz", "abc/xyz");
assertLocation("/foo://host:port/path", "foo://host:port/path");

// special handling for file URIs without hostnames
// special handling for Locations without hostnames
assertLocation("file:/", "file", "");
assertLocation("file:/hello.txt", "file", "hello.txt");
assertLocation("file:/some/path", "file", "some/path");
assertLocation("file:/some@what/path", "file", "some@what/path");
assertLocation("hdfs:/a/hadoop/path.csv", "hdfs", "a/hadoop/path.csv");

// invalid locations
assertThatThrownBy(() -> Location.of(null))
Expand All @@ -96,9 +97,6 @@ void testParse()
assertThatThrownBy(() -> Location.of("scheme://host:invalid/path"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("port");
assertThatThrownBy(() -> Location.of("scheme:/path"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("scheme");

// fragment is not allowed
assertThatThrownBy(() -> Location.of("scheme://userInfo@host/some/path#fragement"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand Down Expand Up @@ -162,6 +163,13 @@ public FileIterator listFiles(Location location)
{
return delegate.listFiles(location);
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
{
return delegate.directoryExists(location);
}
}

private static class TrackingInputFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.TrinoHdfsFileSystemStats;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.UUID;

import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath;
import static io.trino.hdfs.FileSystemUtils.getRawFileSystem;
Expand All @@ -48,6 +52,8 @@ class HdfsFileSystem
private final HdfsContext context;
private final TrinoHdfsFileSystemStats stats;

private final Map<FileSystem, Boolean> hierarchicalFileSystemCache = new IdentityHashMap<>();

public HdfsFileSystem(HdfsEnvironment environment, HdfsContext context, TrinoHdfsFileSystemStats stats)
{
this.environment = requireNonNull(environment, "environment is null");
Expand Down Expand Up @@ -188,4 +194,54 @@ public FileIterator listFiles(Location location)
}
});
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
{
stats.getDirectoryExistsCalls().newCall();
Path directory = hadoopPath(location);
FileSystem fileSystem = environment.getFileSystem(context, directory);

return environment.doAs(context.getIdentity(), () -> {
if (!hierarchical(fileSystem, location)) {
return Optional.empty();
}

try (TimeStat.BlockTimer ignored = stats.getDirectoryExistsCalls().time()) {
FileStatus fileStatus = fileSystem.getFileStatus(directory);
return Optional.of(fileStatus.isDirectory());
}
catch (FileNotFoundException e) {
return Optional.of(false);
}
catch (IOException e) {
stats.getListFilesCalls().recordException(e);
throw e;
}
});
}

private boolean hierarchical(FileSystem fileSystem, Location rootLocation)
alexjo2144 marked this conversation as resolved.
Show resolved Hide resolved
{
Boolean cachedResult = hierarchicalFileSystemCache.get(fileSystem);
if (cachedResult != null) {
return cachedResult;
}

// Hierarchical file systems will fail to list directories which do not exist.
// Object store file systems like S3 will allow these kinds of operations.
// Attempt to list a path which does not exist to know which one we have.
try {
fileSystem.listStatus(hadoopPath(rootLocation.appendPath(UUID.randomUUID().toString())));
hierarchicalFileSystemCache.putIfAbsent(fileSystem, false);
return false;
}
catch (IOException e) {
// Being overly broad to avoid throwing an exception with the random UUID path in it.
// Instead, defer to later calls to fail with a more appropriate message.
hierarchicalFileSystemCache.putIfAbsent(fileSystem, true);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public final class TrinoHdfsFileSystemStats
private final CallStats renameFileCalls = new CallStats();
private final CallStats deleteFileCalls = new CallStats();
private final CallStats deleteDirectoryCalls = new CallStats();
private final CallStats directoryExistsCalls = new CallStats();

@Managed
@Nested
Expand Down Expand Up @@ -66,4 +67,11 @@ public CallStats getDeleteDirectoryCalls()
{
return deleteDirectoryCalls;
}

@Managed
@Nested
public CallStats getDirectoryExistsCalls()
{
return directoryExistsCalls;
}
}
Loading