Skip to content

Commit

Permalink
Migrate Hive DirectoryLister to TrinoFileSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and electrum committed May 16, 2023
1 parent 0856bde commit 5b8e06a
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
Expand Down Expand Up @@ -546,21 +547,28 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
return getTransactionalSplits(Location.of(path.toString()), splittable, bucketConversion, splitFactory);
}

TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
Location location = Location.of(path.toString());
// Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping
if (tableBucketInfo.isPresent()) {
List<TrinoFileStatus> files = listBucketFiles(path, fs, splitFactory.getPartitionName());
List<TrinoFileStatus> files = listBucketFiles(trinoFileSystem, location, splitFactory.getPartitionName());
return hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, Optional.empty()));
}

fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable, Optional.empty()));
Iterator<InternalHiveSplit> splitIterator = createInternalHiveSplitIterator(trinoFileSystem, location, splitFactory, splittable, Optional.empty());
fileIterators.addLast(splitIterator);

return COMPLETED_FUTURE;
}

private List<TrinoFileStatus> listBucketFiles(Path path, FileSystem fs, String partitionName)
private List<TrinoFileStatus> listBucketFiles(TrinoFileSystem fs, Location location, String partitionName)
{
if (!ignoreAbsentPartitions) {
checkPartitionLocationExists(fs, location);
}

try {
return ImmutableList.copyOf(new HiveFileIterator(table, path, fs, directoryLister, hdfsNamenodeStats, FAIL, ignoreAbsentPartitions));
return ImmutableList.copyOf(new HiveFileIterator(table, location, fs, directoryLister, hdfsNamenodeStats, FAIL));
}
catch (HiveFileIterator.NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
Expand Down Expand Up @@ -640,9 +648,12 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
throws IOException
{
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, parent);
TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
Location location = Location.of(parent.toString());

checkPartitionLocationExists(trinoFileSystem, location);
Map<Path, TrinoFileStatus> fileStatuses = new HashMap<>();
HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, hdfsNamenodeStats, IGNORED, false);
HiveFileIterator fileStatusIterator = new HiveFileIterator(table, location, trinoFileSystem, directoryLister, hdfsNamenodeStats, IGNORED);
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(new Path(status.getPath())), status));

List<TrinoFileStatus> locatedFileStatuses = new ArrayList<>();
Expand Down Expand Up @@ -801,12 +812,28 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu
.anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));
}

private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo)
private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(TrinoFileSystem fileSystem, Location location, InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo)
{
Iterator<TrinoFileStatus> iterator = new HiveFileIterator(table, path, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions);
if (!ignoreAbsentPartitions) {
checkPartitionLocationExists(fileSystem, location);
}

Iterator<TrinoFileStatus> iterator = new HiveFileIterator(table, location, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED);
return createInternalHiveSplitIterator(splitFactory, splittable, acidInfo, Streams.stream(iterator));
}

private static void checkPartitionLocationExists(TrinoFileSystem fileSystem, Location location)
{
try {
if (!fileSystem.directoryExists(location).orElse(true)) {
throw new TrinoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + location);
}
}
catch (IOException e) {
throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed checking directory path:" + location, e);
}
}

private static Iterator<InternalHiveSplit> createInternalHiveSplitIterator(InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo, Stream<TrinoFileStatus> fileStream)
{
return fileStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Storage;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.weakref.jmx.Managed;

import javax.inject.Inject;
Expand All @@ -52,7 +52,7 @@ public class CachingDirectoryLister
{
//TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys
// to deal more efficiently with cache invalidation scenarios for partitioned tables.
private final Cache<String, ValueHolder> cache;
private final Cache<Location, ValueHolder> cache;
private final List<SchemaTablePrefix> tablePrefixes;

@Inject
Expand All @@ -65,7 +65,7 @@ public CachingDirectoryLister(Duration expireAfterWrite, DataSize maxSize, List<
{
this.cache = EvictableCacheBuilder.newBuilder()
.maximumWeight(maxSize.toBytes())
.weigher((Weigher<String, ValueHolder>) (key, value) -> toIntExact(estimatedSizeOf(key) + value.getRetainedSizeInBytes()))
.weigher((Weigher<Location, ValueHolder>) (key, value) -> toIntExact(estimatedSizeOf(key.toString()) + value.getRetainedSizeInBytes()))
.expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS)
.shareNothingWhenDisabled()
.recordStats()
Expand All @@ -91,39 +91,39 @@ private static SchemaTablePrefix parseTableName(String tableName)
}

@Override
public RemoteIterator<TrinoFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
public RemoteIterator<TrinoFileStatus> listFilesRecursively(TrinoFileSystem fs, Table table, Location location)
throws IOException
{
if (!isCacheEnabledFor(table.getSchemaTableName())) {
return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true));
return new TrinoFileStatusRemoteIterator(fs.listFiles(location));
}

return listInternal(fs, path);
return listInternal(fs, location);
}

private RemoteIterator<TrinoFileStatus> listInternal(FileSystem fs, Path path)
private RemoteIterator<TrinoFileStatus> listInternal(TrinoFileSystem fs, Location location)
throws IOException
{
ValueHolder cachedValueHolder = uncheckedCacheGet(cache, path.toString(), ValueHolder::new);
ValueHolder cachedValueHolder = uncheckedCacheGet(cache, location, ValueHolder::new);
if (cachedValueHolder.getFiles().isPresent()) {
return new SimpleRemoteIterator(cachedValueHolder.getFiles().get().iterator());
}

return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, path), path);
return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, location), location);
}

private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(FileSystem fs, Path path)
private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(TrinoFileSystem fs, Location location)
throws IOException
{
return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true));
return new TrinoFileStatusRemoteIterator(fs.listFiles(location));
}

@Override
public void invalidate(Table table)
{
if (isCacheEnabledFor(table.getSchemaTableName()) && isLocationPresent(table.getStorage())) {
if (table.getPartitionColumns().isEmpty()) {
cache.invalidate(table.getStorage().getLocation());
cache.invalidate(Location.of(table.getStorage().getLocation()));
}
else {
// a partitioned table can have multiple paths in cache
Expand All @@ -136,11 +136,11 @@ public void invalidate(Table table)
public void invalidate(Partition partition)
{
if (isCacheEnabledFor(partition.getSchemaTableName()) && isLocationPresent(partition.getStorage())) {
cache.invalidate(partition.getStorage().getLocation());
cache.invalidate(Location.of(partition.getStorage().getLocation()));
}
}

private RemoteIterator<TrinoFileStatus> cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator<TrinoFileStatus> iterator, Path path)
private RemoteIterator<TrinoFileStatus> cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator<TrinoFileStatus> iterator, Location location)
{
return new RemoteIterator<>()
{
Expand All @@ -154,7 +154,7 @@ public boolean hasNext()
if (!hasNext) {
// The cachedValueHolder acts as an invalidation guard. If a cache invalidation happens while this iterator goes over
// the files from the specified path, the eventually outdated file listing will not be added anymore to the cache.
cache.asMap().replace(path.toString(), cachedValueHolder, new ValueHolder(files));
cache.asMap().replace(location, cachedValueHolder, new ValueHolder(files));
}
return hasNext;
}
Expand Down Expand Up @@ -207,9 +207,9 @@ public long getRequestCount()
}

@VisibleForTesting
boolean isCached(Path path)
boolean isCached(Location location)
{
ValueHolder cached = cache.getIfPresent(path.toString());
ValueHolder cached = cache.getIfPresent(location);
return cached != null && cached.getFiles().isPresent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package io.trino.plugin.hive.fs;

import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.hive.TableInvalidationCallback;
import io.trino.plugin.hive.metastore.Table;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

public interface DirectoryLister
extends TableInvalidationCallback
{
RemoteIterator<TrinoFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
RemoteIterator<TrinoFileStatus> listFilesRecursively(TrinoFileSystem fs, Table table, Location location)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.plugin.hive.fs;

import org.apache.hadoop.fs.Path;
import io.trino.filesystem.Location;

import javax.annotation.Nullable;

Expand All @@ -28,17 +28,19 @@
public class DirectoryListingFilter
implements RemoteIterator<TrinoFileStatus>
{
private final Path prefix;
private final Location prefix;
private final RemoteIterator<TrinoFileStatus> delegateIterator;
private final boolean failOnUnexpectedFiles;

@Nullable private TrinoFileStatus nextElement;

public DirectoryListingFilter(Path prefix, RemoteIterator<TrinoFileStatus> delegateIterator)
public DirectoryListingFilter(Location prefix, RemoteIterator<TrinoFileStatus> delegateIterator, boolean failOnUnexpectedFiles)
throws IOException
{
this.prefix = requireNonNull(prefix, "prefix is null");
this.delegateIterator = requireNonNull(delegateIterator, "delegateIterator is null");
this.nextElement = findNextElement();
this.failOnUnexpectedFiles = failOnUnexpectedFiles;
}

@Override
Expand Down Expand Up @@ -66,16 +68,30 @@ private TrinoFileStatus findNextElement()
{
while (delegateIterator.hasNext()) {
TrinoFileStatus candidate = delegateIterator.next();
Path candidatePath = new Path(candidate.getPath());
Path parent = candidatePath.getParent();
boolean directChild = candidatePath.isAbsolute() ?
(parent != null && parent.equals(prefix)) :
(parent == null || parent.toString().isEmpty());
Location parent = Location.of(candidate.getPath()).parentDirectory();
boolean directChild = parent.equals(prefix);

if (!directChild && failOnUnexpectedFiles && !parentIsHidden(parent, prefix)) {
throw new HiveFileIterator.NestedDirectoryNotAllowedException(candidate.getPath());
}

if (directChild) {
return candidate;
}
}
return null;
}

private static boolean parentIsHidden(Location location, Location prefix)
{
if (location.equals(prefix)) {
return false;
}

if (location.fileName().startsWith(".") || location.fileName().startsWith("_")) {
return true;
}

return parentIsHidden(location.parentDirectory(), prefix);
}
}
Loading

0 comments on commit 5b8e06a

Please sign in to comment.