Skip to content

Commit

Permalink
Reduce Hive file system listing
Browse files Browse the repository at this point in the history
Reduce the number of times Hive needs to call the file
system listing API when using common file systems.

Additionally, avoid unnecessary directory exists checks when
file listing returns a non-empty result.
  • Loading branch information
alexjo2144 authored and losipiuk committed Jul 14, 2023
1 parent fb75256 commit 703cc2a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.filesystem.hdfs;

import com.google.common.collect.ImmutableMap;
import io.airlift.stats.TimeStat;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
Expand Down Expand Up @@ -48,6 +49,13 @@
class HdfsFileSystem
implements TrinoFileSystem
{
private static final Map<String, Boolean> KNOWN_HIERARCHICAL_FILESYSTEMS = ImmutableMap.<String, Boolean>builder()
.put("s3", false)
.put("s3a", false)
.put("s3n", false)
.put("hdfs", true)
.buildOrThrow();

private final HdfsEnvironment environment;
private final HdfsContext context;
private final TrinoHdfsFileSystemStats stats;
Expand Down Expand Up @@ -224,6 +232,11 @@ public Optional<Boolean> directoryExists(Location location)

private boolean hierarchical(FileSystem fileSystem, Location rootLocation)
{
Boolean knownResult = KNOWN_HIERARCHICAL_FILESYSTEMS.get(fileSystem.getScheme());
if (knownResult != null) {
return knownResult;
}

Boolean cachedResult = hierarchicalFileSystemCache.get(fileSystem);
if (cachedResult != null) {
return cachedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,12 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)

private List<TrinoFileStatus> listBucketFiles(TrinoFileSystem fs, Location location, String partitionName)
{
if (!ignoreAbsentPartitions) {
checkPartitionLocationExists(fs, location);
}

try {
return ImmutableList.copyOf(new HiveFileIterator(table, location, fs, directoryLister, hdfsNamenodeStats, FAIL));
HiveFileIterator fileIterator = new HiveFileIterator(table, location, fs, directoryLister, hdfsNamenodeStats, FAIL);
if (!fileIterator.hasNext() && !ignoreAbsentPartitions) {
checkPartitionLocationExists(fs, location);
}
return ImmutableList.copyOf(fileIterator);
}
catch (HiveFileIterator.NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
Expand Down Expand Up @@ -651,9 +651,11 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
Location location = Location.of(parent.toString());

checkPartitionLocationExists(trinoFileSystem, location);
Map<Path, TrinoFileStatus> fileStatuses = new HashMap<>();
HiveFileIterator fileStatusIterator = new HiveFileIterator(table, location, trinoFileSystem, directoryLister, hdfsNamenodeStats, IGNORED);
if (!fileStatusIterator.hasNext()) {
checkPartitionLocationExists(trinoFileSystem, location);
}
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(new Path(status.getPath())), status));

List<TrinoFileStatus> locatedFileStatuses = new ArrayList<>();
Expand Down Expand Up @@ -814,11 +816,10 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu

private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(TrinoFileSystem fileSystem, Location location, InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo)
{
if (!ignoreAbsentPartitions) {
Iterator<TrinoFileStatus> iterator = new HiveFileIterator(table, location, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED);
if (!iterator.hasNext() && !ignoreAbsentPartitions) {
checkPartitionLocationExists(fileSystem, location);
}

Iterator<TrinoFileStatus> iterator = new HiveFileIterator(table, location, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED);
return createInternalHiveSplitIterator(splitFactory, splittable, acidInfo, Streams.stream(iterator));
}

Expand Down

0 comments on commit 703cc2a

Please sign in to comment.