From 5b8e06a9ecfaf5b2e7255419b787c00f9c0c345d Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Mon, 1 May 2023 12:15:39 -0400 Subject: [PATCH] Migrate Hive DirectoryLister to TrinoFileSystem --- .../hive/BackgroundHiveSplitLoader.java | 41 ++++++++-- .../hive/fs/CachingDirectoryLister.java | 36 ++++----- .../trino/plugin/hive/fs/DirectoryLister.java | 6 +- .../hive/fs/DirectoryListingFilter.java | 32 ++++++-- .../plugin/hive/fs/HiveFileIterator.java | 79 ++++++++----------- .../TransactionDirectoryListingCacheKey.java | 10 ++- ...ransactionScopeCachingDirectoryLister.java | 22 +++--- .../trino/plugin/hive/fs/TrinoFileStatus.java | 5 -- .../fs/TrinoFileStatusRemoteIterator.java | 6 +- .../trino/plugin/hive/AbstractTestHive.java | 5 +- .../hive/AbstractTestHiveFileSystem.java | 18 +++-- .../hive/TestBackgroundHiveSplitLoader.java | 17 +++- .../fs/BaseCachingDirectoryListerTest.java | 63 ++++++++------- .../hive/fs/FileSystemDirectoryLister.java | 8 +- .../hive/fs/TestCachingDirectoryLister.java | 6 +- ...hingDirectoryListerRecursiveFilesOnly.java | 8 +- ...ransactionScopeCachingDirectoryLister.java | 43 +++++----- .../hive/TestAvroSymlinkInputFormat.java | 4 +- 18 files changed, 223 insertions(+), 186 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java index 64186c5880f1..8ed4e5b19c09 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java @@ -110,6 +110,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE; @@ -546,21 +547,28 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) return getTransactionalSplits(Location.of(path.toString()), splittable, bucketConversion, splitFactory); } + TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); + Location location = Location.of(path.toString()); // Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping if (tableBucketInfo.isPresent()) { - List files = listBucketFiles(path, fs, splitFactory.getPartitionName()); + List files = listBucketFiles(trinoFileSystem, location, splitFactory.getPartitionName()); return hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, Optional.empty())); } - fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable, Optional.empty())); + Iterator splitIterator = createInternalHiveSplitIterator(trinoFileSystem, location, splitFactory, splittable, Optional.empty()); + fileIterators.addLast(splitIterator); return COMPLETED_FUTURE; } - private List listBucketFiles(Path path, FileSystem fs, String partitionName) + private List listBucketFiles(TrinoFileSystem fs, Location location, String partitionName) { + if (!ignoreAbsentPartitions) { + checkPartitionLocationExists(fs, location); + } + try { - return ImmutableList.copyOf(new HiveFileIterator(table, path, fs, directoryLister, hdfsNamenodeStats, FAIL, ignoreAbsentPartitions)); + return ImmutableList.copyOf(new HiveFileIterator(table, location, fs, directoryLister, hdfsNamenodeStats, FAIL)); } catch (HiveFileIterator.NestedDirectoryNotAllowedException e) { // Fail here to be on the safe side. This seems to be the same as what Hive does @@ -640,9 +648,12 @@ Optional> buildManifestFileIterator( throws IOException { FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, parent); + TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); + Location location = Location.of(parent.toString()); + checkPartitionLocationExists(trinoFileSystem, location); Map fileStatuses = new HashMap<>(); - HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, hdfsNamenodeStats, IGNORED, false); + HiveFileIterator fileStatusIterator = new HiveFileIterator(table, location, trinoFileSystem, directoryLister, hdfsNamenodeStats, IGNORED); fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(new Path(status.getPath())), status)); List locatedFileStatuses = new ArrayList<>(); @@ -801,12 +812,28 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat inpu .anyMatch(name -> name.equals("UseFileSplitsFromInputFormat")); } - private Iterator createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo) + private Iterator createInternalHiveSplitIterator(TrinoFileSystem fileSystem, Location location, InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo) { - Iterator iterator = new HiveFileIterator(table, path, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions); + if (!ignoreAbsentPartitions) { + checkPartitionLocationExists(fileSystem, location); + } + + Iterator iterator = new HiveFileIterator(table, location, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED); return createInternalHiveSplitIterator(splitFactory, splittable, acidInfo, Streams.stream(iterator)); } + private static void checkPartitionLocationExists(TrinoFileSystem fileSystem, Location location) + { + try { + if (!fileSystem.directoryExists(location).orElse(true)) { + throw new TrinoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + location); + } + } + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed checking directory path:" + location, e); + } + } + private static Iterator createInternalHiveSplitIterator(InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo, Stream fileStream) { return fileStream diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java index b3f2825357be..bbdfe317595f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java @@ -20,14 +20,14 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.collect.cache.EvictableCacheBuilder; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.weakref.jmx.Managed; import javax.inject.Inject; @@ -52,7 +52,7 @@ public class CachingDirectoryLister { //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys // to deal more efficiently with cache invalidation scenarios for partitioned tables. - private final Cache cache; + private final Cache cache; private final List tablePrefixes; @Inject @@ -65,7 +65,7 @@ public CachingDirectoryLister(Duration expireAfterWrite, DataSize maxSize, List< { this.cache = EvictableCacheBuilder.newBuilder() .maximumWeight(maxSize.toBytes()) - .weigher((Weigher) (key, value) -> toIntExact(estimatedSizeOf(key) + value.getRetainedSizeInBytes())) + .weigher((Weigher) (key, value) -> toIntExact(estimatedSizeOf(key.toString()) + value.getRetainedSizeInBytes())) .expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS) .shareNothingWhenDisabled() .recordStats() @@ -91,31 +91,31 @@ private static SchemaTablePrefix parseTableName(String tableName) } @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException { if (!isCacheEnabledFor(table.getSchemaTableName())) { - return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); + return new TrinoFileStatusRemoteIterator(fs.listFiles(location)); } - return listInternal(fs, path); + return listInternal(fs, location); } - private RemoteIterator listInternal(FileSystem fs, Path path) + private RemoteIterator listInternal(TrinoFileSystem fs, Location location) throws IOException { - ValueHolder cachedValueHolder = uncheckedCacheGet(cache, path.toString(), ValueHolder::new); + ValueHolder cachedValueHolder = uncheckedCacheGet(cache, location, ValueHolder::new); if (cachedValueHolder.getFiles().isPresent()) { return new SimpleRemoteIterator(cachedValueHolder.getFiles().get().iterator()); } - return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, path), path); + return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, location), location); } - private static RemoteIterator createListingRemoteIterator(FileSystem fs, Path path) + private static RemoteIterator createListingRemoteIterator(TrinoFileSystem fs, Location location) throws IOException { - return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); + return new TrinoFileStatusRemoteIterator(fs.listFiles(location)); } @Override @@ -123,7 +123,7 @@ public void invalidate(Table table) { if (isCacheEnabledFor(table.getSchemaTableName()) && isLocationPresent(table.getStorage())) { if (table.getPartitionColumns().isEmpty()) { - cache.invalidate(table.getStorage().getLocation()); + cache.invalidate(Location.of(table.getStorage().getLocation())); } else { // a partitioned table can have multiple paths in cache @@ -136,11 +136,11 @@ public void invalidate(Table table) public void invalidate(Partition partition) { if (isCacheEnabledFor(partition.getSchemaTableName()) && isLocationPresent(partition.getStorage())) { - cache.invalidate(partition.getStorage().getLocation()); + cache.invalidate(Location.of(partition.getStorage().getLocation())); } } - private RemoteIterator cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator iterator, Path path) + private RemoteIterator cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator iterator, Location location) { return new RemoteIterator<>() { @@ -154,7 +154,7 @@ public boolean hasNext() if (!hasNext) { // The cachedValueHolder acts as an invalidation guard. If a cache invalidation happens while this iterator goes over // the files from the specified path, the eventually outdated file listing will not be added anymore to the cache. - cache.asMap().replace(path.toString(), cachedValueHolder, new ValueHolder(files)); + cache.asMap().replace(location, cachedValueHolder, new ValueHolder(files)); } return hasNext; } @@ -207,9 +207,9 @@ public long getRequestCount() } @VisibleForTesting - boolean isCached(Path path) + boolean isCached(Location location) { - ValueHolder cached = cache.getIfPresent(path.toString()); + ValueHolder cached = cache.getIfPresent(location); return cached != null && cached.getFiles().isPresent(); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java index 4fd2ce2b5ce2..def1b624a39b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java @@ -13,16 +13,16 @@ */ package io.trino.plugin.hive.fs; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import java.io.IOException; public interface DirectoryLister extends TableInvalidationCallback { - RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) + RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java index 437a24b9bca5..07efcea578f5 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java @@ -13,7 +13,7 @@ */ package io.trino.plugin.hive.fs; -import org.apache.hadoop.fs.Path; +import io.trino.filesystem.Location; import javax.annotation.Nullable; @@ -28,17 +28,19 @@ public class DirectoryListingFilter implements RemoteIterator { - private final Path prefix; + private final Location prefix; private final RemoteIterator delegateIterator; + private final boolean failOnUnexpectedFiles; @Nullable private TrinoFileStatus nextElement; - public DirectoryListingFilter(Path prefix, RemoteIterator delegateIterator) + public DirectoryListingFilter(Location prefix, RemoteIterator delegateIterator, boolean failOnUnexpectedFiles) throws IOException { this.prefix = requireNonNull(prefix, "prefix is null"); this.delegateIterator = requireNonNull(delegateIterator, "delegateIterator is null"); this.nextElement = findNextElement(); + this.failOnUnexpectedFiles = failOnUnexpectedFiles; } @Override @@ -66,11 +68,12 @@ private TrinoFileStatus findNextElement() { while (delegateIterator.hasNext()) { TrinoFileStatus candidate = delegateIterator.next(); - Path candidatePath = new Path(candidate.getPath()); - Path parent = candidatePath.getParent(); - boolean directChild = candidatePath.isAbsolute() ? - (parent != null && parent.equals(prefix)) : - (parent == null || parent.toString().isEmpty()); + Location parent = Location.of(candidate.getPath()).parentDirectory(); + boolean directChild = parent.equals(prefix); + + if (!directChild && failOnUnexpectedFiles && !parentIsHidden(parent, prefix)) { + throw new HiveFileIterator.NestedDirectoryNotAllowedException(candidate.getPath()); + } if (directChild) { return candidate; @@ -78,4 +81,17 @@ private TrinoFileStatus findNextElement() } return null; } + + private static boolean parentIsHidden(Location location, Location prefix) + { + if (location.equals(prefix)) { + return false; + } + + if (location.fileName().startsWith(".") || location.fileName().startsWith("_")) { + return true; + } + + return parentIsHidden(location.parentDirectory(), prefix); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java index 985a5b903130..e96d3eadbb87 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java @@ -16,10 +16,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.AbstractIterator; import io.airlift.stats.TimeStat; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.hdfs.HdfsNamenodeStats; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.TrinoException; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.FileNotFoundException; @@ -29,8 +30,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; +import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.FAIL; import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.RECURSE; -import static java.util.Collections.emptyIterator; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.Path.SEPARATOR_CHAR; @@ -46,30 +47,27 @@ public enum NestedDirectoryPolicy private final String pathPrefix; private final Table table; - private final FileSystem fileSystem; + private final TrinoFileSystem fileSystem; private final DirectoryLister directoryLister; private final HdfsNamenodeStats namenodeStats; private final NestedDirectoryPolicy nestedDirectoryPolicy; - private final boolean ignoreAbsentPartitions; private final Iterator remoteIterator; public HiveFileIterator( Table table, - Path path, - FileSystem fileSystem, + Location location, + TrinoFileSystem fileSystem, DirectoryLister directoryLister, HdfsNamenodeStats namenodeStats, - NestedDirectoryPolicy nestedDirectoryPolicy, - boolean ignoreAbsentPartitions) + NestedDirectoryPolicy nestedDirectoryPolicy) { - this.pathPrefix = path.toUri().getPath(); + this.pathPrefix = location.toString(); this.table = requireNonNull(table, "table is null"); this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null"); this.nestedDirectoryPolicy = requireNonNull(nestedDirectoryPolicy, "nestedDirectoryPolicy is null"); - this.ignoreAbsentPartitions = ignoreAbsentPartitions; - this.remoteIterator = getLocatedFileStatusRemoteIterator(path); + this.remoteIterator = getLocatedFileStatusRemoteIterator(location); } @Override @@ -89,42 +87,19 @@ else if (isHiddenFileOrDirectory(new Path(status.getPath()))) { continue; } - if (status.isDirectory()) { - switch (nestedDirectoryPolicy) { - case IGNORED: - continue; - case RECURSE: - // Recursive listings call listFiles which should not return directories, this is a contract violation - // and can be handled the same way as the FAIL case - case FAIL: - throw new NestedDirectoryNotAllowedException(status.getPath()); - } - } return status; } return endOfData(); } - private Iterator getLocatedFileStatusRemoteIterator(Path path) + private Iterator getLocatedFileStatusRemoteIterator(Location location) { try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) { - return new FileStatusIterator(table, path, fileSystem, directoryLister, namenodeStats, nestedDirectoryPolicy == RECURSE); + return new FileStatusIterator(table, location, fileSystem, directoryLister, namenodeStats, nestedDirectoryPolicy); } - catch (TrinoException e) { - if (ignoreAbsentPartitions) { - try { - if (!fileSystem.exists(path)) { - return emptyIterator(); - } - } - catch (Exception ee) { - TrinoException trinoException = new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed to check if path exists: " + path, ee); - trinoException.addSuppressed(e); - throw trinoException; - } - } - throw e; + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Filed to list files for location: " + location, e); } } @@ -147,7 +122,7 @@ static boolean isHiddenFileOrDirectory(Path path) @VisibleForTesting static boolean isHiddenOrWithinHiddenParentDirectory(Path path, String prefix) { - String pathString = path.toUri().getPath(); + String pathString = path.toUri().toString(); checkArgument(pathString.startsWith(prefix), "path %s does not start with prefix %s", pathString, prefix); return containsHiddenPathPartAfterIndex(pathString, prefix.endsWith("/") ? prefix.length() : prefix.length() + 1); } @@ -173,20 +148,30 @@ static boolean containsHiddenPathPartAfterIndex(String pathString, int startFrom private static class FileStatusIterator implements Iterator { - private final Path path; + private final Location location; private final HdfsNamenodeStats namenodeStats; private final RemoteIterator fileStatusIterator; - private FileStatusIterator(Table table, Path path, FileSystem fileSystem, DirectoryLister directoryLister, HdfsNamenodeStats namenodeStats, boolean recursive) + private FileStatusIterator( + Table table, + Location location, + TrinoFileSystem fileSystem, + DirectoryLister directoryLister, + HdfsNamenodeStats namenodeStats, + NestedDirectoryPolicy nestedDirectoryPolicy) + throws IOException { - this.path = path; + this.location = location; this.namenodeStats = namenodeStats; try { - if (recursive) { - this.fileStatusIterator = directoryLister.listFilesRecursively(fileSystem, table, path); + if (nestedDirectoryPolicy == RECURSE) { + this.fileStatusIterator = directoryLister.listFilesRecursively(fileSystem, table, location); } else { - this.fileStatusIterator = new DirectoryListingFilter(path, directoryLister.listFilesRecursively(fileSystem, table, path)); + this.fileStatusIterator = new DirectoryListingFilter( + location, + directoryLister.listFilesRecursively(fileSystem, table, location), + nestedDirectoryPolicy == FAIL); } } catch (IOException e) { @@ -220,9 +205,9 @@ private TrinoException processException(IOException exception) { namenodeStats.getRemoteIteratorNext().recordException(exception); if (exception instanceof FileNotFoundException) { - return new TrinoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + path); + return new TrinoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + location); } - return new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed to list directory: " + path, exception); + return new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed to list directory: " + location, exception); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java index bc7a0cd9b22e..d18c6fcf50f0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java @@ -13,6 +13,8 @@ */ package io.trino.plugin.hive.fs; +import io.trino.filesystem.Location; + import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; @@ -25,22 +27,22 @@ public class TransactionDirectoryListingCacheKey private static final long INSTANCE_SIZE = instanceSize(TransactionDirectoryListingCacheKey.class); private final long transactionId; - private final String path; + private final Location path; - public TransactionDirectoryListingCacheKey(long transactionId, String path) + public TransactionDirectoryListingCacheKey(long transactionId, Location path) { this.transactionId = transactionId; this.path = requireNonNull(path, "path is null"); } - public String getPath() + public Location getPath() { return path; } public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + estimatedSizeOf(path); + return INSTANCE_SIZE + estimatedSizeOf(path.toString()); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java index dd7cbb305da6..3dd7ba700aef 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java @@ -16,11 +16,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.util.concurrent.UncheckedExecutionException; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -65,13 +65,13 @@ public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long tra } @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException { - return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, path.toString())); + return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, location)); } - private RemoteIterator listInternal(FileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) + private RemoteIterator listInternal(TrinoFileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) throws IOException { FetchingValueHolder cachedValueHolder; @@ -92,10 +92,10 @@ private RemoteIterator listInternal(FileSystem fs, Table table, return cachingRemoteIterator(cachedValueHolder, cacheKey); } - private RemoteIterator createListingRemoteIterator(FileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) + private RemoteIterator createListingRemoteIterator(TrinoFileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) throws IOException { - return delegate.listFilesRecursively(fs, table, new Path(cacheKey.getPath())); + return delegate.listFilesRecursively(fs, table, cacheKey.getPath()); } @Override @@ -103,7 +103,7 @@ public void invalidate(Table table) { if (isLocationPresent(table.getStorage())) { if (table.getPartitionColumns().isEmpty()) { - cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, table.getStorage().getLocation())); + cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, Location.of(table.getStorage().getLocation()))); } else { // a partitioned table can have multiple paths in cache @@ -117,7 +117,7 @@ public void invalidate(Table table) public void invalidate(Partition partition) { if (isLocationPresent(partition.getStorage())) { - cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, partition.getStorage().getLocation())); + cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, Location.of(partition.getStorage().getLocation()))); } delegate.invalidate(partition); } @@ -159,9 +159,9 @@ public TrinoFileStatus next() } @VisibleForTesting - boolean isCached(Path path) + boolean isCached(Location location) { - return isCached(new TransactionDirectoryListingCacheKey(transactionId, path.toString())); + return isCached(new TransactionDirectoryListingCacheKey(transactionId, location)); } @VisibleForTesting diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java index 1bebf9a66f95..51f2a22fb901 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java @@ -79,11 +79,6 @@ public String getPath() return path; } - public boolean isDirectory() - { - return isDirectory; - } - public long getLength() { return length; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatusRemoteIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatusRemoteIterator.java index 56f479703c8c..730895748d7d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatusRemoteIterator.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatusRemoteIterator.java @@ -13,7 +13,7 @@ */ package io.trino.plugin.hive.fs; -import org.apache.hadoop.fs.LocatedFileStatus; +import io.trino.filesystem.FileIterator; import java.io.IOException; @@ -22,9 +22,9 @@ public class TrinoFileStatusRemoteIterator implements RemoteIterator { - private final org.apache.hadoop.fs.RemoteIterator iterator; + private final FileIterator iterator; - public TrinoFileStatusRemoteIterator(org.apache.hadoop.fs.RemoteIterator iterator) + public TrinoFileStatusRemoteIterator(FileIterator iterator) { this.iterator = requireNonNull(iterator, "iterator is null"); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 837ed91942f2..9a7166bec7e2 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -26,6 +26,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsConfig; import io.trino.hdfs.HdfsContext; @@ -6331,11 +6332,11 @@ private static class CountingDirectoryLister private final AtomicInteger listCount = new AtomicInteger(); @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException { listCount.incrementAndGet(); - return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); + return new TrinoFileStatusRemoteIterator(fs.listFiles(location)); } public int getListCount() diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 4a310d31be61..2d6011c376d4 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -23,12 +23,14 @@ import io.airlift.slice.Slice; import io.airlift.stats.CounterStat; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsConfig; import io.trino.hdfs.HdfsConfiguration; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.HdfsNamenodeStats; +import io.trino.hdfs.TrinoHdfsFileSystemStats; import io.trino.hdfs.authentication.NoHdfsAuthentication; import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.plugin.base.CatalogName; @@ -106,6 +108,7 @@ import static io.trino.plugin.hive.AbstractTestHive.getSplits; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; +import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders; @@ -449,6 +452,7 @@ public void testFileIteratorListing() // base-path-file.txt Path basePath = new Path(getBasePath(), "test-file-iterator-listing"); FileSystem fs = hdfsEnvironment.getFileSystem(TESTING_CONTEXT, basePath); + TrinoFileSystem trinoFileSystem = new HdfsFileSystemFactory(hdfsEnvironment, new TrinoHdfsFileSystemStats()).create(SESSION); fs.mkdirs(basePath); // create file in hidden folder @@ -473,12 +477,11 @@ public void testFileIteratorListing() // List recursively through hive file iterator HiveFileIterator recursiveIterator = new HiveFileIterator( fakeTable, - basePath, - fs, + Location.of(basePath.toString()), + trinoFileSystem, new FileSystemDirectoryLister(), new HdfsNamenodeStats(), - HiveFileIterator.NestedDirectoryPolicy.RECURSE, - false); // ignoreAbsentPartitions + HiveFileIterator.NestedDirectoryPolicy.RECURSE); List recursiveListing = Streams.stream(recursiveIterator) .map(TrinoFileStatus::getPath) @@ -489,12 +492,11 @@ public void testFileIteratorListing() HiveFileIterator shallowIterator = new HiveFileIterator( fakeTable, - basePath, - fs, + Location.of(basePath.toString()), + trinoFileSystem, new FileSystemDirectoryLister(), new HdfsNamenodeStats(), - HiveFileIterator.NestedDirectoryPolicy.IGNORED, - false); // ignoreAbsentPartitions + HiveFileIterator.NestedDirectoryPolicy.IGNORED); List shallowListing = Streams.stream(shallowIterator) .map(TrinoFileStatus::getPath) .map(Path::new) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 35116dada105..20b428b88e14 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -1466,7 +1466,18 @@ public void setWorkingDirectory(Path dir) @Override public FileStatus[] listStatus(Path f) { - throw new UnsupportedOperationException(); + FileStatus[] fileStatuses = new FileStatus[files.size()]; + for (int i = 0; i < files.size(); i++) { + LocatedFileStatus locatedFileStatus = files.get(i); + fileStatuses[i] = new FileStatus( + locatedFileStatus.getLen(), + locatedFileStatus.isDirectory(), + locatedFileStatus.getReplication(), + locatedFileStatus.getBlockSize(), + locatedFileStatus.getModificationTime(), + locatedFileStatus.getPath()); + } + return fileStatuses; } @Override @@ -1530,13 +1541,13 @@ public FileStatus getFileStatus(Path f) @Override public Path getWorkingDirectory() { - throw new UnsupportedOperationException(); + return new Path(getUri()); } @Override public URI getUri() { - throw new UnsupportedOperationException(); + return URI.create("hdfs://VOL1:9000/"); } } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java index 4965a601fdff..ee26f071ecc7 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.Location; import io.trino.plugin.hive.HiveQueryRunner; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.Table; @@ -66,7 +67,7 @@ protected QueryRunner createQueryRunner(Map properties) protected abstract C createDirectoryLister(); - protected abstract boolean isCached(C directoryLister, org.apache.hadoop.fs.Path path); + protected abstract boolean isCached(C directoryLister, Location location); @Test public void testCacheInvalidationIsAppliedSpecificallyOnTheNonPartitionedTableBeingChanged() @@ -75,14 +76,14 @@ public void testCacheInvalidationIsAppliedSpecificallyOnTheNonPartitionedTableBe assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (1), (2), (3)", 3); // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table1", "VALUES (6)"); - org.apache.hadoop.fs.Path cachedTable1Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table1"); + String cachedTable1Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table1"); assertThat(isCached(cachedTable1Location)).isTrue(); assertUpdate("CREATE TABLE partial_cache_invalidation_table2 (col1 int) WITH (format = 'ORC')"); assertUpdate("INSERT INTO partial_cache_invalidation_table2 VALUES (11), (12)", 2); // The listing for the invalidate_non_partitioned_table2 should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table2", "VALUES (23)"); - org.apache.hadoop.fs.Path cachedTable2Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table2"); + String cachedTable2Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table2"); assertThat(isCached(cachedTable2Location)).isTrue(); assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (4), (5)", 2); @@ -104,14 +105,14 @@ public void testCacheInvalidationIsAppliedOnTheEntireCacheOnPartitionedTableDrop assertUpdate("INSERT INTO full_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM full_cache_invalidation_non_partitioned_table", "VALUES (6)"); - org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "full_cache_invalidation_non_partitioned_table"); + String nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "full_cache_invalidation_non_partitioned_table"); assertThat(isCached(nonPartitionedTableLocation)).isTrue(); assertUpdate("CREATE TABLE full_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); assertUpdate("INSERT INTO full_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); assertQuery("SELECT col2, sum(col1) FROM full_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group2")); + String partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group1")); + String partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group2")); assertThat(isCached(partitionedTableGroup1PartitionLocation)).isTrue(); assertThat(isCached(partitionedTableGroup2PartitionLocation)).isTrue(); @@ -139,14 +140,14 @@ public void testCacheInvalidationIsAppliedSpecificallyOnPartitionDropped() assertUpdate("INSERT INTO partition_path_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM partition_path_cache_invalidation_non_partitioned_table", "VALUES (6)"); - org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_non_partitioned_table"); + String nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_non_partitioned_table"); assertThat(isCached(nonPartitionedTableLocation)).isTrue(); assertUpdate("CREATE TABLE partition_path_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); assertUpdate("INSERT INTO partition_path_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); assertQuery("SELECT col2, sum(col1) FROM partition_path_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group2")); + String partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group1")); + String partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group2")); assertThat(isCached(partitionedTableGroup1PartitionLocation)).isTrue(); assertThat(isCached(partitionedTableGroup2PartitionLocation)).isTrue(); @@ -186,8 +187,8 @@ public void testInsertIntoPartitionedTable() assertUpdate("INSERT INTO insert_into_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT col2, sum(col1) FROM insert_into_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group2")); + String tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group1")); + String tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group2")); assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); @@ -207,9 +208,9 @@ public void testDropPartition() assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT col2, sum(col1) FROM delete_from_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group2")); - org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group3")); + String tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group1")); + String tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group2")); + String tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group3")); assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); assertUpdate("DELETE FROM delete_from_partitioned_table WHERE col2 = 'group1' OR col2 = 'group2'"); @@ -229,10 +230,10 @@ public void testDropMultiLevelPartition() assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1000, DATE '2022-02-01', 'US'), (2000, DATE '2022-02-01', 'US'), (4000, DATE '2022-02-02', 'US'), (1500, DATE '2022-02-01', 'AT'), (2500, DATE '2022-02-02', 'AT')", 5); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT day, country, sum(clicks) FROM delete_from_partitioned_table GROUP BY day, country", "VALUES (DATE '2022-02-01', 'US', 3000), (DATE '2022-02-02', 'US', 4000), (DATE '2022-02-01', 'AT', 1500), (DATE '2022-02-02', 'AT', 2500)"); - org.apache.hadoop.fs.Path table20220201UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "US")); - org.apache.hadoop.fs.Path table20220202UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "US")); - org.apache.hadoop.fs.Path table20220201AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "AT")); - org.apache.hadoop.fs.Path table20220202AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "AT")); + String table20220201UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "US")); + String table20220202UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "US")); + String table20220201AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "AT")); + String table20220202AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "AT")); assertThat(isCached(table20220201UsPartitionLocation)).isTrue(); assertThat(isCached(table20220202UsPartitionLocation)).isTrue(); assertThat(isCached(table20220201AtPartitionLocation)).isTrue(); @@ -258,13 +259,13 @@ public void testUnregisterRegisterPartition() assertUpdate("INSERT INTO register_unregister_partition_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group2")); + String tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group1")); + String tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group2")); assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); List paths = getQueryRunner().execute(getSession(), "SELECT \"$path\" FROM register_unregister_partition_table WHERE col2 = 'group1' LIMIT 1").toTestTypes().getMaterializedRows(); - String group1PartitionPath = new org.apache.hadoop.fs.Path((String) paths.get(0).getField(0)).getParent().toString(); + String group1PartitionPath = Location.of((String) paths.get(0).getField(0)).parentDirectory().toString(); assertUpdate(format("CALL system.unregister_partition('%s', '%s', ARRAY['col2'], ARRAY['group1'])", TPCH_SCHEMA, "register_unregister_partition_table")); // Unregistering the partition in the table should invalidate the cached listing of all the partitions belonging to the table. @@ -290,7 +291,7 @@ public void testRenameTable() assertUpdate("INSERT INTO table_to_be_renamed VALUES (1), (2), (3)", 3); // The listing for the table should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM table_to_be_renamed", "VALUES (6)"); - org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_renamed"); + String tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_renamed"); assertThat(isCached(tableLocation)).isTrue(); assertUpdate("ALTER TABLE table_to_be_renamed RENAME TO table_renamed"); // Altering the table should invalidate the cached listing of the files belonging to the table. @@ -306,7 +307,7 @@ public void testDropTable() assertUpdate("INSERT INTO table_to_be_dropped VALUES (1), (2), (3)", 3); // The listing for the table should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM table_to_be_dropped", "VALUES (6)"); - org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_dropped"); + String tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_dropped"); assertThat(isCached(tableLocation)).isTrue(); assertUpdate("DROP TABLE table_to_be_dropped"); // Dropping the table should invalidate the cached listing of the files belonging to the table. @@ -320,9 +321,9 @@ public void testDropPartitionedTable() assertUpdate("INSERT INTO drop_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT col2, sum(col1) FROM drop_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group2")); - org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group3")); + String tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group1")); + String tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group2")); + String tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group3")); assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); assertThat(isCached(tableGroup3PartitionLocation)).isTrue(); @@ -347,27 +348,25 @@ protected void dropTable(String schemaName, String tableName, boolean deleteData fileHiveMetastore.dropTable(schemaName, tableName, deleteData); } - protected org.apache.hadoop.fs.Path getTableLocation(String schemaName, String tableName) + protected String getTableLocation(String schemaName, String tableName) { return getTable(schemaName, tableName) .map(table -> table.getStorage().getLocation()) - .map(tableLocation -> new org.apache.hadoop.fs.Path(tableLocation)) .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); } - protected org.apache.hadoop.fs.Path getPartitionLocation(String schemaName, String tableName, List partitionValues) + protected String getPartitionLocation(String schemaName, String tableName, List partitionValues) { Table table = getTable(schemaName, tableName) .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); return fileHiveMetastore.getPartition(table, partitionValues) .map(partition -> partition.getStorage().getLocation()) - .map(partitionLocation -> new org.apache.hadoop.fs.Path(partitionLocation)) .orElseThrow(() -> new NoSuchElementException(format("The partition %s from the table %s.%s could not be found", partitionValues, schemaName, tableName))); } - protected boolean isCached(org.apache.hadoop.fs.Path path) + protected boolean isCached(String path) { - return isCached(directoryLister, path); + return isCached(directoryLister, Location.of(path)); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java index 9e88af19f76f..b89bf75fe7ab 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java @@ -13,10 +13,10 @@ */ package io.trino.plugin.hive.fs; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import java.io.IOException; @@ -24,10 +24,10 @@ public class FileSystemDirectoryLister implements DirectoryLister { @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException { - return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); + return new TrinoFileStatusRemoteIterator(fs.listFiles(location)); } @Override diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java index 4cc21ce7b920..3d37a7e05fe5 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java @@ -15,7 +15,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; -import org.apache.hadoop.fs.Path; +import io.trino.filesystem.Location; import org.testng.annotations.Test; import java.util.List; @@ -34,9 +34,9 @@ protected CachingDirectoryLister createDirectoryLister() } @Override - protected boolean isCached(CachingDirectoryLister directoryLister, Path path) + protected boolean isCached(CachingDirectoryLister directoryLister, Location location) { - return directoryLister.isCached(path); + return directoryLister.isCached(location); } @Test diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java index 46fc92a42ad1..010b8330145c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java @@ -17,10 +17,10 @@ import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.filesystem.Location; import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.Table; import io.trino.testing.QueryRunner; -import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; import java.util.List; @@ -54,9 +54,9 @@ protected QueryRunner createQueryRunner() } @Override - protected boolean isCached(CachingDirectoryLister directoryLister, Path path) + protected boolean isCached(CachingDirectoryLister directoryLister, Location location) { - return directoryLister.isCached(path); + return directoryLister.isCached(location); } @Test @@ -80,7 +80,7 @@ public void testRecursiveDirectories() // Execute a query on the new table to pull the listing into the cache assertQuery("SELECT sum(clicks) FROM recursive_directories", "VALUES (11000)"); - Path tableLocation = getTableLocation(TPCH_SCHEMA, "recursive_directories"); + String tableLocation = getTableLocation(TPCH_SCHEMA, "recursive_directories"); assertTrue(isCached(tableLocation)); // Insert should invalidate cache, even at the root directory path diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java index eeddb644b9a2..a67bb89bf514 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java @@ -16,6 +16,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.HiveBucketProperty; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.metastore.Column; @@ -24,8 +26,6 @@ import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.StorageFormat; import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; import java.io.IOException; @@ -76,21 +76,21 @@ protected TransactionScopeCachingDirectoryLister createDirectoryLister() } @Override - protected boolean isCached(TransactionScopeCachingDirectoryLister directoryLister, Path path) + protected boolean isCached(TransactionScopeCachingDirectoryLister directoryLister, Location location) { - return directoryLister.isCached(path); + return directoryLister.isCached(location); } @Test public void testConcurrentDirectoryListing() throws IOException { - TrinoFileStatus firstFile = new TrinoFileStatus(ImmutableList.of(), "x", false, 1, 1); - TrinoFileStatus secondFile = new TrinoFileStatus(ImmutableList.of(), "y", false, 1, 1); - TrinoFileStatus thirdFile = new TrinoFileStatus(ImmutableList.of(), "z", false, 1, 1); + TrinoFileStatus firstFile = new TrinoFileStatus(ImmutableList.of(), "file:/x/x", false, 1, 1); + TrinoFileStatus secondFile = new TrinoFileStatus(ImmutableList.of(), "file:/x/y", false, 1, 1); + TrinoFileStatus thirdFile = new TrinoFileStatus(ImmutableList.of(), "file:/y/z", false, 1, 1); - Path path1 = new Path("x"); - Path path2 = new Path("y"); + Location path1 = Location.of("file:/x"); + Location path2 = Location.of("file:/y"); CountingDirectoryLister countingLister = new CountingDirectoryLister( ImmutableMap.of( @@ -101,17 +101,17 @@ public void testConcurrentDirectoryListing() // due to Token being a key in segmented cache. TransactionScopeCachingDirectoryLister cachingLister = (TransactionScopeCachingDirectoryLister) new TransactionScopeCachingDirectoryListerFactory(DataSize.ofBytes(500), Optional.of(1)).get(countingLister); - assertFiles(new DirectoryListingFilter(path2, (cachingLister.listFilesRecursively(null, TABLE, path2))), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, (cachingLister.listFilesRecursively(null, TABLE, path2)), true), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(1); // listing path2 again shouldn't increase listing count assertThat(cachingLister.isCached(path2)).isTrue(); - assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2)), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2), true), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(1); // start listing path1 concurrently - RemoteIterator path1FilesA = new DirectoryListingFilter(path1, cachingLister.listFilesRecursively(null, TABLE, path1)); - RemoteIterator path1FilesB = new DirectoryListingFilter(path1, cachingLister.listFilesRecursively(null, TABLE, path1)); + RemoteIterator path1FilesA = new DirectoryListingFilter(path1, cachingLister.listFilesRecursively(null, TABLE, path1), true); + RemoteIterator path1FilesB = new DirectoryListingFilter(path1, cachingLister.listFilesRecursively(null, TABLE, path1), true); assertThat(countingLister.getListCount()).isEqualTo(2); // list path1 files using both iterators concurrently @@ -125,7 +125,7 @@ public void testConcurrentDirectoryListing() // listing path2 again should increase listing count because 2 files were cached for path1 assertThat(cachingLister.isCached(path2)).isFalse(); - assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2)), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2), true), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(3); } @@ -133,8 +133,8 @@ public void testConcurrentDirectoryListing() public void testConcurrentDirectoryListingException() throws IOException { - TrinoFileStatus file = new TrinoFileStatus(ImmutableList.of(), "x", false, 1, 1); - Path path = new Path("x"); + TrinoFileStatus file = new TrinoFileStatus(ImmutableList.of(), "file:/x/x", false, 1, 1); + Location path = Location.of("file:/x"); CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file))); DirectoryLister cachingLister = new TransactionScopeCachingDirectoryListerFactory(DataSize.ofBytes(600), Optional.empty()).get(countingLister); @@ -150,7 +150,7 @@ public void testConcurrentDirectoryListingException() // listing again should succeed countingLister.setThrowException(false); - assertFiles(new DirectoryListingFilter(path, cachingLister.listFilesRecursively(null, TABLE, path)), ImmutableList.of(file)); + assertFiles(new DirectoryListingFilter(path, cachingLister.listFilesRecursively(null, TABLE, path), true), ImmutableList.of(file)); assertThat(countingLister.getListCount()).isEqualTo(2); // listing using second concurrently initialized DirectoryLister should fail @@ -170,22 +170,21 @@ private void assertFiles(RemoteIterator iterator, List> fileStatuses; + private final Map> fileStatuses; private int listCount; private boolean throwException; - public CountingDirectoryLister(Map> fileStatuses) + public CountingDirectoryLister(Map> fileStatuses) { this.fileStatuses = requireNonNull(fileStatuses, "fileStatuses is null"); } @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) - throws IOException + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) { // No specific recursive files-only listing implementation listCount++; - return throwingRemoteIterator(requireNonNull(fileStatuses.get(path)), throwException); + return throwingRemoteIterator(requireNonNull(fileStatuses.get(location)), throwException); } public void setThrowException(boolean throwException) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestAvroSymlinkInputFormat.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestAvroSymlinkInputFormat.java index d1c277b2201d..bd227e8ceea5 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestAvroSymlinkInputFormat.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestAvroSymlinkInputFormat.java @@ -137,9 +137,9 @@ public void testSymlinkTableWithNestedDirectory() "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'"); String tableRoot = warehouseDirectory + '/' + table; - String dataDir = warehouseDirectory + "/data_test_avro_symlink_with_nested_directory"; + String dataDir = warehouseDirectory + "/data_test_avro_symlink_with_nested_directory/nested_directory"; saveResourceOnHdfs("avro/original_data.avro", dataDir + "/original_data.avro"); - hdfsClient.saveFile(tableRoot + "/symlink.txt", format("hdfs://%s/", dataDir)); + hdfsClient.saveFile(tableRoot + "/symlink.txt", format("hdfs://%s/original_data.avro", dataDir)); assertThat(onTrino().executeQuery("SELECT * FROM " + table)).containsExactlyInOrder(row("someValue", 1)); onHive().executeQuery("DROP TABLE " + table);