From 0856bdee03d280ee0fc6f06105c05786fcbeb075 Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Fri, 28 Apr 2023 17:51:50 -0400 Subject: [PATCH] Remove hierarchical listing from Hive DirectoryLister --- .../hive/fs/CachingDirectoryLister.java | 46 +++------ .../trino/plugin/hive/fs/DirectoryLister.java | 3 - .../hive/fs/DirectoryListingCacheKey.java | 96 ------------------- .../hive/fs/DirectoryListingFilter.java | 81 ++++++++++++++++ .../plugin/hive/fs/HiveFileIterator.java | 2 +- .../TransactionDirectoryListingCacheKey.java | 19 ++-- ...ransactionScopeCachingDirectoryLister.java | 28 ++---- .../trino/plugin/hive/AbstractTestHive.java | 8 -- .../hive/TestBackgroundHiveSplitLoader.java | 21 +--- .../hive/fs/FileSystemDirectoryLister.java | 7 -- ...hingDirectoryListerRecursiveFilesOnly.java | 2 +- ...ransactionScopeCachingDirectoryLister.java | 27 ++---- 12 files changed, 124 insertions(+), 216 deletions(-) delete mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingCacheKey.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java index 82b441b52afb..b3f2825357be 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java @@ -52,7 +52,7 @@ public class CachingDirectoryLister { //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys // to deal more efficiently with cache invalidation scenarios for partitioned tables. - private final Cache cache; + private final Cache cache; private final List tablePrefixes; @Inject @@ -65,7 +65,7 @@ public CachingDirectoryLister(Duration expireAfterWrite, DataSize maxSize, List< { this.cache = EvictableCacheBuilder.newBuilder() .maximumWeight(maxSize.toBytes()) - .weigher((Weigher) (key, value) -> toIntExact(key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes())) + .weigher((Weigher) (key, value) -> toIntExact(estimatedSizeOf(key) + value.getRetainedSizeInBytes())) .expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS) .shareNothingWhenDisabled() .recordStats() @@ -90,17 +90,6 @@ private static SchemaTablePrefix parseTableName(String tableName) return new SchemaTablePrefix(schema, table); } - @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) - throws IOException - { - if (!isCacheEnabledFor(table.getSchemaTableName())) { - return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(path)); - } - - return listInternal(fs, new DirectoryListingCacheKey(path.toString(), false)); - } - @Override public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) throws IOException @@ -109,27 +98,24 @@ public RemoteIterator listFilesRecursively(FileSystem fs, Table return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); } - return listInternal(fs, new DirectoryListingCacheKey(path.toString(), true)); + return listInternal(fs, path); } - private RemoteIterator listInternal(FileSystem fs, DirectoryListingCacheKey cacheKey) + private RemoteIterator listInternal(FileSystem fs, Path path) throws IOException { - ValueHolder cachedValueHolder = uncheckedCacheGet(cache, cacheKey, ValueHolder::new); + ValueHolder cachedValueHolder = uncheckedCacheGet(cache, path.toString(), ValueHolder::new); if (cachedValueHolder.getFiles().isPresent()) { return new SimpleRemoteIterator(cachedValueHolder.getFiles().get().iterator()); } - return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, cacheKey), cacheKey); + return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, path), path); } - private static RemoteIterator createListingRemoteIterator(FileSystem fs, DirectoryListingCacheKey cacheKey) + private static RemoteIterator createListingRemoteIterator(FileSystem fs, Path path) throws IOException { - if (cacheKey.isRecursiveFilesOnly()) { - return new TrinoFileStatusRemoteIterator(fs.listFiles(new Path(cacheKey.getPath()), true)); - } - return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(new Path(cacheKey.getPath()))); + return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); } @Override @@ -137,7 +123,7 @@ public void invalidate(Table table) { if (isCacheEnabledFor(table.getSchemaTableName()) && isLocationPresent(table.getStorage())) { if (table.getPartitionColumns().isEmpty()) { - cache.invalidateAll(DirectoryListingCacheKey.allKeysWithPath(new Path(table.getStorage().getLocation()))); + cache.invalidate(table.getStorage().getLocation()); } else { // a partitioned table can have multiple paths in cache @@ -150,11 +136,11 @@ public void invalidate(Table table) public void invalidate(Partition partition) { if (isCacheEnabledFor(partition.getSchemaTableName()) && isLocationPresent(partition.getStorage())) { - cache.invalidateAll(DirectoryListingCacheKey.allKeysWithPath(new Path(partition.getStorage().getLocation()))); + cache.invalidate(partition.getStorage().getLocation()); } } - private RemoteIterator cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator iterator, DirectoryListingCacheKey key) + private RemoteIterator cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator iterator, Path path) { return new RemoteIterator<>() { @@ -168,7 +154,7 @@ public boolean hasNext() if (!hasNext) { // The cachedValueHolder acts as an invalidation guard. If a cache invalidation happens while this iterator goes over // the files from the specified path, the eventually outdated file listing will not be added anymore to the cache. - cache.asMap().replace(key, cachedValueHolder, new ValueHolder(files)); + cache.asMap().replace(path.toString(), cachedValueHolder, new ValueHolder(files)); } return hasNext; } @@ -223,13 +209,7 @@ public long getRequestCount() @VisibleForTesting boolean isCached(Path path) { - return isCached(new DirectoryListingCacheKey(path.toString(), false)); - } - - @VisibleForTesting - boolean isCached(DirectoryListingCacheKey cacheKey) - { - ValueHolder cached = cache.getIfPresent(cacheKey); + ValueHolder cached = cache.getIfPresent(path.toString()); return cached != null && cached.getFiles().isPresent(); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java index 81a95971d695..4fd2ce2b5ce2 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java @@ -23,9 +23,6 @@ public interface DirectoryLister extends TableInvalidationCallback { - RemoteIterator list(FileSystem fs, Table table, Path path) - throws IOException; - RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) throws IOException; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingCacheKey.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingCacheKey.java deleted file mode 100644 index be6bcef7b251..000000000000 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingCacheKey.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.hive.fs; - -import com.google.common.collect.ImmutableList; -import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.util.List; -import java.util.Objects; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static io.airlift.slice.SizeOf.estimatedSizeOf; -import static io.airlift.slice.SizeOf.instanceSize; -import static java.util.Objects.requireNonNull; - -/** - * A cache key designed for use in {@link CachingDirectoryLister} and {@link TransactionScopeCachingDirectoryLister} - * that allows distinct cache entries to be created for both recursive, files-only listings and shallow listings - * (that also might contain directories) at the same {@link Path}, ie: {@link DirectoryLister#list(FileSystem, Table, Path)} - * and {@link DirectoryLister#listFilesRecursively(FileSystem, Table, Path)} results. - */ -final class DirectoryListingCacheKey -{ - private static final long INSTANCE_SIZE = instanceSize(DirectoryListingCacheKey.class); - - private final String path; - private final int hashCode; // precomputed hashCode - private final boolean recursiveFilesOnly; - - public DirectoryListingCacheKey(String path, boolean recursiveFilesOnly) - { - this.path = requireNonNull(path, "path is null"); - this.recursiveFilesOnly = recursiveFilesOnly; - this.hashCode = Objects.hash(path, recursiveFilesOnly); - } - - public String getPath() - { - return path; - } - - public boolean isRecursiveFilesOnly() - { - return recursiveFilesOnly; - } - - public long getRetainedSizeInBytes() - { - return INSTANCE_SIZE + estimatedSizeOf(path); - } - - @Override - public int hashCode() - { - return hashCode; - } - - @Override - public boolean equals(Object o) - { - if (o == null || (o.getClass() != this.getClass())) { - return false; - } - DirectoryListingCacheKey other = (DirectoryListingCacheKey) o; - return recursiveFilesOnly == other.recursiveFilesOnly - && hashCode == other.hashCode - && path.equals(other.path); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("path", path) - .add("isRecursiveFilesOnly", recursiveFilesOnly) - .toString(); - } - - public static List allKeysWithPath(Path path) - { - return ImmutableList.of(new DirectoryListingCacheKey(path.toString(), true), new DirectoryListingCacheKey(path.toString(), false)); - } -} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java new file mode 100644 index 000000000000..437a24b9bca5 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import org.apache.hadoop.fs.Path; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +import static java.util.Objects.requireNonNull; + +/** + * Filters down the full listing of a path prefix to just the files directly in a given directory. + */ +public class DirectoryListingFilter + implements RemoteIterator +{ + private final Path prefix; + private final RemoteIterator delegateIterator; + + @Nullable private TrinoFileStatus nextElement; + + public DirectoryListingFilter(Path prefix, RemoteIterator delegateIterator) + throws IOException + { + this.prefix = requireNonNull(prefix, "prefix is null"); + this.delegateIterator = requireNonNull(delegateIterator, "delegateIterator is null"); + this.nextElement = findNextElement(); + } + + @Override + public boolean hasNext() + throws IOException + { + return nextElement != null; + } + + @Override + public TrinoFileStatus next() + throws IOException + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + TrinoFileStatus thisElement = nextElement; + this.nextElement = findNextElement(); + return thisElement; + } + + private TrinoFileStatus findNextElement() + throws IOException + { + while (delegateIterator.hasNext()) { + TrinoFileStatus candidate = delegateIterator.next(); + Path candidatePath = new Path(candidate.getPath()); + Path parent = candidatePath.getParent(); + boolean directChild = candidatePath.isAbsolute() ? + (parent != null && parent.equals(prefix)) : + (parent == null || parent.toString().isEmpty()); + + if (directChild) { + return candidate; + } + } + return null; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java index c16835bda12b..985a5b903130 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java @@ -186,7 +186,7 @@ private FileStatusIterator(Table table, Path path, FileSystem fileSystem, Direct this.fileStatusIterator = directoryLister.listFilesRecursively(fileSystem, table, path); } else { - this.fileStatusIterator = directoryLister.list(fileSystem, table, path); + this.fileStatusIterator = new DirectoryListingFilter(path, directoryLister.listFilesRecursively(fileSystem, table, path)); } } catch (IOException e) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java index bd48d7a44400..bc7a0cd9b22e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java @@ -16,6 +16,7 @@ import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; import static java.util.Objects.requireNonNull; @@ -24,22 +25,22 @@ public class TransactionDirectoryListingCacheKey private static final long INSTANCE_SIZE = instanceSize(TransactionDirectoryListingCacheKey.class); private final long transactionId; - private final DirectoryListingCacheKey key; + private final String path; - public TransactionDirectoryListingCacheKey(long transactionId, DirectoryListingCacheKey key) + public TransactionDirectoryListingCacheKey(long transactionId, String path) { this.transactionId = transactionId; - this.key = requireNonNull(key, "key is null"); + this.path = requireNonNull(path, "path is null"); } - public DirectoryListingCacheKey getKey() + public String getPath() { - return key; + return path; } public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + key.getRetainedSizeInBytes(); + return INSTANCE_SIZE + estimatedSizeOf(path); } @Override @@ -52,13 +53,13 @@ public boolean equals(Object o) return false; } TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o; - return transactionId == that.transactionId && key.equals(that.key); + return transactionId == that.transactionId && path.equals(that.path); } @Override public int hashCode() { - return Objects.hash(transactionId, key); + return Objects.hash(transactionId, path); } @Override @@ -66,7 +67,7 @@ public String toString() { return toStringHelper(this) .add("transactionId", transactionId) - .add("key", key) + .add("path", path) .toString(); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java index 8063fb191bb2..dd7cbb305da6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java @@ -37,10 +37,8 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.base.Throwables.throwIfUnchecked; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.SizeOf.instanceSize; import static io.airlift.slice.SizeOf.sizeOfObjectArray; -import static io.trino.plugin.hive.fs.DirectoryListingCacheKey.allKeysWithPath; import static java.util.Collections.synchronizedList; import static java.util.Objects.requireNonNull; @@ -66,18 +64,11 @@ public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long tra this.cache = requireNonNull(cache, "cache is null"); } - @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) - throws IOException - { - return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), false))); - } - @Override public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) throws IOException { - return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), true))); + return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, path.toString())); } private RemoteIterator listInternal(FileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) @@ -91,7 +82,7 @@ private RemoteIterator listInternal(FileSystem fs, Table table, Throwable throwable = e.getCause(); throwIfInstanceOf(throwable, IOException.class); throwIfUnchecked(throwable); - throw new RuntimeException("Failed to list directory: " + cacheKey.getKey().getPath(), throwable); + throw new RuntimeException("Failed to list directory: " + cacheKey.getPath(), throwable); } if (cachedValueHolder.isFullyCached()) { @@ -104,10 +95,7 @@ private RemoteIterator listInternal(FileSystem fs, Table table, private RemoteIterator createListingRemoteIterator(FileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) throws IOException { - if (cacheKey.getKey().isRecursiveFilesOnly()) { - return delegate.listFilesRecursively(fs, table, new Path(cacheKey.getKey().getPath())); - } - return delegate.list(fs, table, new Path(cacheKey.getKey().getPath())); + return delegate.listFilesRecursively(fs, table, new Path(cacheKey.getPath())); } @Override @@ -115,9 +103,7 @@ public void invalidate(Table table) { if (isLocationPresent(table.getStorage())) { if (table.getPartitionColumns().isEmpty()) { - cache.invalidateAll(allKeysWithPath(new Path(table.getStorage().getLocation())).stream() - .map(key -> new TransactionDirectoryListingCacheKey(transactionId, key)) - .collect(toImmutableList())); + cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, table.getStorage().getLocation())); } else { // a partitioned table can have multiple paths in cache @@ -131,9 +117,7 @@ public void invalidate(Table table) public void invalidate(Partition partition) { if (isLocationPresent(partition.getStorage())) { - cache.invalidateAll(allKeysWithPath(new Path(partition.getStorage().getLocation())).stream() - .map(key -> new TransactionDirectoryListingCacheKey(transactionId, key)) - .collect(toImmutableList())); + cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, partition.getStorage().getLocation())); } delegate.invalidate(partition); } @@ -177,7 +161,7 @@ public TrinoFileStatus next() @VisibleForTesting boolean isCached(Path path) { - return isCached(new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), false))); + return isCached(new TransactionDirectoryListingCacheKey(transactionId, path.toString())); } @VisibleForTesting diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 92792d8a8f15..837ed91942f2 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -6330,14 +6330,6 @@ private static class CountingDirectoryLister { private final AtomicInteger listCount = new AtomicInteger(); - @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) - throws IOException - { - listCount.incrementAndGet(); - return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(path)); - } - @Override public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) throws IOException diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 4fb53a783a39..35116dada105 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -912,11 +912,11 @@ public void testBuildManifestFileIteratorNestedDirectory() schema.setProperty(SERIALIZATION_LIB, AVRO.getSerde()); Path filePath = new Path("hdfs://VOL1:9000/db_name/table_name/file1"); - Path directoryPath = new Path("hdfs://VOL1:9000/db_name/table_name/dir"); + Path directoryPath = new Path("hdfs://VOL1:9000/db_name/table_name/dir/file2"); List paths = ImmutableList.of(filePath, directoryPath); List files = ImmutableList.of( locatedFileStatus(filePath), - locatedDirectoryStatus(directoryPath)); + locatedFileStatus(directoryPath)); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( files, @@ -1412,23 +1412,6 @@ private static LocatedFileStatus locatedFileStatusWithNoBlocks(Path path) new BlockLocation[] {}); } - private static LocatedFileStatus locatedDirectoryStatus(Path path) - { - return new LocatedFileStatus( - 0L, - true, - 0, - 0L, - 0L, - 0L, - null, - null, - null, - null, - path, - new BlockLocation[] {}); - } - public static class TestingHdfsEnvironment extends HdfsEnvironment { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java index 75f1ce69910d..9e88af19f76f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java @@ -23,13 +23,6 @@ public class FileSystemDirectoryLister implements DirectoryLister { - @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) - throws IOException - { - return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(path)); - } - @Override public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) throws IOException diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java index ea9a47f360ae..46fc92a42ad1 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java @@ -56,7 +56,7 @@ protected QueryRunner createQueryRunner() @Override protected boolean isCached(CachingDirectoryLister directoryLister, Path path) { - return directoryLister.isCached(new DirectoryListingCacheKey(path.toString(), true)); + return directoryLister.isCached(path); } @Test diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java index 2830a5aaee55..eeddb644b9a2 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java @@ -101,17 +101,17 @@ public void testConcurrentDirectoryListing() // due to Token being a key in segmented cache. TransactionScopeCachingDirectoryLister cachingLister = (TransactionScopeCachingDirectoryLister) new TransactionScopeCachingDirectoryListerFactory(DataSize.ofBytes(500), Optional.of(1)).get(countingLister); - assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, (cachingLister.listFilesRecursively(null, TABLE, path2))), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(1); // listing path2 again shouldn't increase listing count assertThat(cachingLister.isCached(path2)).isTrue(); - assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2)), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(1); // start listing path1 concurrently - RemoteIterator path1FilesA = cachingLister.list(null, TABLE, path1); - RemoteIterator path1FilesB = cachingLister.list(null, TABLE, path1); + RemoteIterator path1FilesA = new DirectoryListingFilter(path1, cachingLister.listFilesRecursively(null, TABLE, path1)); + RemoteIterator path1FilesB = new DirectoryListingFilter(path1, cachingLister.listFilesRecursively(null, TABLE, path1)); assertThat(countingLister.getListCount()).isEqualTo(2); // list path1 files using both iterators concurrently @@ -125,7 +125,7 @@ public void testConcurrentDirectoryListing() // listing path2 again should increase listing count because 2 files were cached for path1 assertThat(cachingLister.isCached(path2)).isFalse(); - assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2)), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(3); } @@ -141,8 +141,8 @@ public void testConcurrentDirectoryListingException() // start listing path concurrently countingLister.setThrowException(true); - RemoteIterator filesA = cachingLister.list(null, TABLE, path); - RemoteIterator filesB = cachingLister.list(null, TABLE, path); + RemoteIterator filesA = cachingLister.listFilesRecursively(null, TABLE, path); + RemoteIterator filesB = cachingLister.listFilesRecursively(null, TABLE, path); assertThat(countingLister.getListCount()).isEqualTo(1); // listing should throw an exception @@ -150,7 +150,7 @@ public void testConcurrentDirectoryListingException() // listing again should succeed countingLister.setThrowException(false); - assertFiles(cachingLister.list(null, TABLE, path), ImmutableList.of(file)); + assertFiles(new DirectoryListingFilter(path, cachingLister.listFilesRecursively(null, TABLE, path)), ImmutableList.of(file)); assertThat(countingLister.getListCount()).isEqualTo(2); // listing using second concurrently initialized DirectoryLister should fail @@ -179,20 +179,13 @@ public CountingDirectoryLister(Map> fileStatuses) this.fileStatuses = requireNonNull(fileStatuses, "fileStatuses is null"); } - @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) - throws IOException - { - listCount++; - return throwingRemoteIterator(requireNonNull(fileStatuses.get(path)), throwException); - } - @Override public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) throws IOException { // No specific recursive files-only listing implementation - return list(fs, table, path); + listCount++; + return throwingRemoteIterator(requireNonNull(fileStatuses.get(path)), throwException); } public void setThrowException(boolean throwException)