Skip to content

Commit

Permalink
Remove hierarchical listing from Hive DirectoryLister
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and electrum committed May 16, 2023
1 parent cfedd7e commit 0856bde
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class CachingDirectoryLister
{
//TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys
// to deal more efficiently with cache invalidation scenarios for partitioned tables.
private final Cache<DirectoryListingCacheKey, ValueHolder> cache;
private final Cache<String, ValueHolder> cache;
private final List<SchemaTablePrefix> tablePrefixes;

@Inject
Expand All @@ -65,7 +65,7 @@ public CachingDirectoryLister(Duration expireAfterWrite, DataSize maxSize, List<
{
this.cache = EvictableCacheBuilder.newBuilder()
.maximumWeight(maxSize.toBytes())
.weigher((Weigher<DirectoryListingCacheKey, ValueHolder>) (key, value) -> toIntExact(key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes()))
.weigher((Weigher<String, ValueHolder>) (key, value) -> toIntExact(estimatedSizeOf(key) + value.getRetainedSizeInBytes()))
.expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS)
.shareNothingWhenDisabled()
.recordStats()
Expand All @@ -90,17 +90,6 @@ private static SchemaTablePrefix parseTableName(String tableName)
return new SchemaTablePrefix(schema, table);
}

@Override
public RemoteIterator<TrinoFileStatus> list(FileSystem fs, Table table, Path path)
throws IOException
{
if (!isCacheEnabledFor(table.getSchemaTableName())) {
return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(path));
}

return listInternal(fs, new DirectoryListingCacheKey(path.toString(), false));
}

@Override
public RemoteIterator<TrinoFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
throws IOException
Expand All @@ -109,35 +98,32 @@ public RemoteIterator<TrinoFileStatus> listFilesRecursively(FileSystem fs, Table
return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true));
}

return listInternal(fs, new DirectoryListingCacheKey(path.toString(), true));
return listInternal(fs, path);
}

private RemoteIterator<TrinoFileStatus> listInternal(FileSystem fs, DirectoryListingCacheKey cacheKey)
private RemoteIterator<TrinoFileStatus> listInternal(FileSystem fs, Path path)
throws IOException
{
ValueHolder cachedValueHolder = uncheckedCacheGet(cache, cacheKey, ValueHolder::new);
ValueHolder cachedValueHolder = uncheckedCacheGet(cache, path.toString(), ValueHolder::new);
if (cachedValueHolder.getFiles().isPresent()) {
return new SimpleRemoteIterator(cachedValueHolder.getFiles().get().iterator());
}

return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, cacheKey), cacheKey);
return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, path), path);
}

private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(FileSystem fs, DirectoryListingCacheKey cacheKey)
private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(FileSystem fs, Path path)
throws IOException
{
if (cacheKey.isRecursiveFilesOnly()) {
return new TrinoFileStatusRemoteIterator(fs.listFiles(new Path(cacheKey.getPath()), true));
}
return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(new Path(cacheKey.getPath())));
return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true));
}

@Override
public void invalidate(Table table)
{
if (isCacheEnabledFor(table.getSchemaTableName()) && isLocationPresent(table.getStorage())) {
if (table.getPartitionColumns().isEmpty()) {
cache.invalidateAll(DirectoryListingCacheKey.allKeysWithPath(new Path(table.getStorage().getLocation())));
cache.invalidate(table.getStorage().getLocation());
}
else {
// a partitioned table can have multiple paths in cache
Expand All @@ -150,11 +136,11 @@ public void invalidate(Table table)
public void invalidate(Partition partition)
{
if (isCacheEnabledFor(partition.getSchemaTableName()) && isLocationPresent(partition.getStorage())) {
cache.invalidateAll(DirectoryListingCacheKey.allKeysWithPath(new Path(partition.getStorage().getLocation())));
cache.invalidate(partition.getStorage().getLocation());
}
}

private RemoteIterator<TrinoFileStatus> cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator<TrinoFileStatus> iterator, DirectoryListingCacheKey key)
private RemoteIterator<TrinoFileStatus> cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator<TrinoFileStatus> iterator, Path path)
{
return new RemoteIterator<>()
{
Expand All @@ -168,7 +154,7 @@ public boolean hasNext()
if (!hasNext) {
// The cachedValueHolder acts as an invalidation guard. If a cache invalidation happens while this iterator goes over
// the files from the specified path, the eventually outdated file listing will not be added anymore to the cache.
cache.asMap().replace(key, cachedValueHolder, new ValueHolder(files));
cache.asMap().replace(path.toString(), cachedValueHolder, new ValueHolder(files));
}
return hasNext;
}
Expand Down Expand Up @@ -223,13 +209,7 @@ public long getRequestCount()
@VisibleForTesting
boolean isCached(Path path)
{
return isCached(new DirectoryListingCacheKey(path.toString(), false));
}

@VisibleForTesting
boolean isCached(DirectoryListingCacheKey cacheKey)
{
ValueHolder cached = cache.getIfPresent(cacheKey);
ValueHolder cached = cache.getIfPresent(path.toString());
return cached != null && cached.getFiles().isPresent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
public interface DirectoryLister
extends TableInvalidationCallback
{
RemoteIterator<TrinoFileStatus> list(FileSystem fs, Table table, Path path)
throws IOException;

RemoteIterator<TrinoFileStatus> listFilesRecursively(FileSystem fs, Table table, Path path)
throws IOException;
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.fs;

import org.apache.hadoop.fs.Path;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.NoSuchElementException;

import static java.util.Objects.requireNonNull;

/**
* Filters down the full listing of a path prefix to just the files directly in a given directory.
*/
public class DirectoryListingFilter
implements RemoteIterator<TrinoFileStatus>
{
private final Path prefix;
private final RemoteIterator<TrinoFileStatus> delegateIterator;

@Nullable private TrinoFileStatus nextElement;

public DirectoryListingFilter(Path prefix, RemoteIterator<TrinoFileStatus> delegateIterator)
throws IOException
{
this.prefix = requireNonNull(prefix, "prefix is null");
this.delegateIterator = requireNonNull(delegateIterator, "delegateIterator is null");
this.nextElement = findNextElement();
}

@Override
public boolean hasNext()
throws IOException
{
return nextElement != null;
}

@Override
public TrinoFileStatus next()
throws IOException
{
if (!hasNext()) {
throw new NoSuchElementException();
}

TrinoFileStatus thisElement = nextElement;
this.nextElement = findNextElement();
return thisElement;
}

private TrinoFileStatus findNextElement()
throws IOException
{
while (delegateIterator.hasNext()) {
TrinoFileStatus candidate = delegateIterator.next();
Path candidatePath = new Path(candidate.getPath());
Path parent = candidatePath.getParent();
boolean directChild = candidatePath.isAbsolute() ?
(parent != null && parent.equals(prefix)) :
(parent == null || parent.toString().isEmpty());

if (directChild) {
return candidate;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private FileStatusIterator(Table table, Path path, FileSystem fileSystem, Direct
this.fileStatusIterator = directoryLister.listFilesRecursively(fileSystem, table, path);
}
else {
this.fileStatusIterator = directoryLister.list(fileSystem, table, path);
this.fileStatusIterator = new DirectoryListingFilter(path, directoryLister.listFilesRecursively(fileSystem, table, path));
}
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

Expand All @@ -24,22 +25,22 @@ public class TransactionDirectoryListingCacheKey
private static final long INSTANCE_SIZE = instanceSize(TransactionDirectoryListingCacheKey.class);

private final long transactionId;
private final DirectoryListingCacheKey key;
private final String path;

public TransactionDirectoryListingCacheKey(long transactionId, DirectoryListingCacheKey key)
public TransactionDirectoryListingCacheKey(long transactionId, String path)
{
this.transactionId = transactionId;
this.key = requireNonNull(key, "key is null");
this.path = requireNonNull(path, "path is null");
}

public DirectoryListingCacheKey getKey()
public String getPath()
{
return key;
return path;
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + key.getRetainedSizeInBytes();
return INSTANCE_SIZE + estimatedSizeOf(path);
}

@Override
Expand All @@ -52,21 +53,21 @@ public boolean equals(Object o)
return false;
}
TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o;
return transactionId == that.transactionId && key.equals(that.key);
return transactionId == that.transactionId && path.equals(that.path);
}

@Override
public int hashCode()
{
return Objects.hash(transactionId, key);
return Objects.hash(transactionId, path);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("transactionId", transactionId)
.add("key", key)
.add("path", path)
.toString();
}
}
Loading

0 comments on commit 0856bde

Please sign in to comment.