diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/Location.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/Location.java index a09862b4221a..241203353ebe 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/Location.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/Location.java @@ -44,7 +44,7 @@ */ public final class Location { - private static final Splitter SCHEME_SPLITTER = Splitter.on("://").limit(2); + private static final Splitter SCHEME_SPLITTER = Splitter.on(":").limit(2); private static final Splitter USER_INFO_SPLITTER = Splitter.on('@').limit(2); private static final Splitter AUTHORITY_SPLITTER = Splitter.on('/').limit(2); private static final Splitter HOST_AND_PORT_SPLITTER = Splitter.on(':').limit(2); @@ -70,38 +70,39 @@ public static Location of(String location) return new Location(location, Optional.empty(), Optional.empty(), Optional.empty(), OptionalInt.empty(), location.substring(1)); } - // local file system location - if (location.startsWith("file:/") && ((location.length() == 6) || (location.charAt(6) != '/'))) { - return new Location(location, Optional.of("file"), Optional.empty(), Optional.empty(), OptionalInt.empty(), location.substring(6)); - } - List schemeSplit = SCHEME_SPLITTER.splitToList(location); checkArgument(schemeSplit.size() == 2, "No scheme for file system location: %s", location); String scheme = schemeSplit.get(0); String afterScheme = schemeSplit.get(1); + if (afterScheme.startsWith("//")) { + // Locations with an authority must begin with a double slash + afterScheme = afterScheme.substring(2); + List userInfoSplit = USER_INFO_SPLITTER.splitToList(afterScheme); + Optional userInfo = userInfoSplit.size() == 2 ? Optional.of(userInfoSplit.get(0)) : Optional.empty(); + + List authoritySplit = AUTHORITY_SPLITTER.splitToList(Iterables.getLast(userInfoSplit)); + List hostAndPortSplit = HOST_AND_PORT_SPLITTER.splitToList(authoritySplit.get(0)); + + Optional host = Optional.of(hostAndPortSplit.get(0)).filter(not(String::isEmpty)); + + OptionalInt port = OptionalInt.empty(); + if (hostAndPortSplit.size() == 2) { + try { + port = OptionalInt.of(parseInt(hostAndPortSplit.get(1))); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid port in file system location: " + location, e); + } + } - List userInfoSplit = USER_INFO_SPLITTER.splitToList(afterScheme); - Optional userInfo = userInfoSplit.size() == 2 ? Optional.of(userInfoSplit.get(0)) : Optional.empty(); - - List authoritySplit = AUTHORITY_SPLITTER.splitToList(Iterables.getLast(userInfoSplit)); - List hostAndPortSplit = HOST_AND_PORT_SPLITTER.splitToList(authoritySplit.get(0)); - - Optional host = Optional.of(hostAndPortSplit.get(0)).filter(not(String::isEmpty)); + String path = (authoritySplit.size() == 2) ? authoritySplit.get(1) : ""; - OptionalInt port = OptionalInt.empty(); - if (hostAndPortSplit.size() == 2) { - try { - port = OptionalInt.of(parseInt(hostAndPortSplit.get(1))); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid port in file system location: " + location, e); - } + return new Location(location, Optional.of(scheme), userInfo, host, port, path); } - String path = (authoritySplit.size() == 2) ? authoritySplit.get(1) : ""; - - return new Location(location, Optional.of(scheme), userInfo, host, port, path); + checkArgument(afterScheme.startsWith("/"), "Path must begin with a '/' when no authority is present"); + return new Location(location, Optional.of(scheme), Optional.empty(), Optional.empty(), OptionalInt.empty(), afterScheme.substring(1)); } private Location(String location, Optional scheme, Optional userInfo, Optional host, OptionalInt port, String path) diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index 7dcb61215039..780139527b01 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Optional; /** * TrinoFileSystem is the main abstraction for Trino to interact with data in cloud-like storage @@ -150,4 +151,14 @@ void renameFile(Location source, Location target) */ FileIterator listFiles(Location location) throws IOException; + + /** + * Checks if a directory exists at the specified location. For non-hierarchical file systems + * an empty Optional is returned. + * + * @param location the location to check for a directory + * @throws IOException if the location is not valid for this file system + */ + Optional directoryExists(Location location) + throws IOException; } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java index 791cfe75e89d..42cd024e67ee 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/local/LocalFileSystem.java @@ -151,6 +151,13 @@ public FileIterator listFiles(Location location) return new LocalFileIterator(location, rootPath, toDirectoryPath(location)); } + @Override + public Optional directoryExists(Location location) + throws IOException + { + return Optional.of(Files.isDirectory(toFilePath(location))); + } + private Path toFilePath(Location location) { validateLocalLocation(location); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java index 4612be0b790f..86c7527f6f96 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystem.java @@ -154,6 +154,12 @@ public FileEntry next() }; } + @Override + public Optional directoryExists(Location location) + { + return Optional.empty(); + } + private static String toBlobKey(Location location) { validateMemoryLocation(location); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java index bd392ed9250d..96c91dd3cef4 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java @@ -107,4 +107,14 @@ public FileIterator listFiles(Location location) .startSpan(); return withTracing(span, () -> delegate.listFiles(location)); } + + @Override + public Optional directoryExists(Location location) + throws IOException + { + Span span = tracer.spanBuilder("FileSystem.directoryExists") + .setAttribute(FileSystemAttributes.FILE_LOCATION, location.toString()) + .startSpan(); + return withTracing(span, () -> delegate.directoryExists(location)); + } } diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java index ca688dc63a09..7dcba6d64130 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java @@ -719,6 +719,29 @@ void testListFiles() } } + @Test + public void testDirectoryExists() + throws IOException + { + try (Closer closer = Closer.create()) { + String directoryName = "testDirectoryExistsDir"; + String fileName = "file.csv"; + createBlob(closer, createLocation(directoryName).appendPath(fileName).path()); + TrinoFileSystem fileSystem = getFileSystem(); + + if (isHierarchical()) { + assertThat(fileSystem.directoryExists(createLocation(directoryName))).contains(true); + assertThat(fileSystem.directoryExists(createLocation(UUID.randomUUID().toString()))).contains(false); + assertThat(fileSystem.directoryExists(createLocation(directoryName).appendPath(fileName))).contains(false); + } + else { + assertThat(fileSystem.directoryExists(createLocation(directoryName))).isEmpty(); + assertThat(fileSystem.directoryExists(createLocation(UUID.randomUUID().toString()))).isEmpty(); + assertThat(fileSystem.directoryExists(createLocation(directoryName).appendPath(fileName))).isEmpty(); + } + } + } + private List listPath(String path) throws IOException { diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TestLocation.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TestLocation.java index b5b13c47d5e8..391fa870481e 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TestLocation.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TestLocation.java @@ -74,11 +74,12 @@ void testParse() assertLocation("/abc/xyz", "abc/xyz"); assertLocation("/foo://host:port/path", "foo://host:port/path"); - // special handling for file URIs without hostnames + // special handling for Locations without hostnames assertLocation("file:/", "file", ""); assertLocation("file:/hello.txt", "file", "hello.txt"); assertLocation("file:/some/path", "file", "some/path"); assertLocation("file:/some@what/path", "file", "some@what/path"); + assertLocation("hdfs:/a/hadoop/path.csv", "hdfs", "a/hadoop/path.csv"); // invalid locations assertThatThrownBy(() -> Location.of(null)) @@ -96,9 +97,6 @@ void testParse() assertThatThrownBy(() -> Location.of("scheme://host:invalid/path")) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("port"); - assertThatThrownBy(() -> Location.of("scheme:/path")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("scheme"); // fragment is not allowed assertThatThrownBy(() -> Location.of("scheme://userInfo@host/some/path#fragement")) diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java index f9d82143a5d5..ebe7af333ce5 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/TrackingFileSystemFactory.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -162,6 +163,13 @@ public FileIterator listFiles(Location location) { return delegate.listFiles(location); } + + @Override + public Optional directoryExists(Location location) + throws IOException + { + return delegate.directoryExists(location); + } } private static class TrackingInputFile diff --git a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java index 47dcd592bd65..163ab8979634 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java +++ b/lib/trino-hdfs/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java @@ -23,15 +23,19 @@ import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.TrinoHdfsFileSystemStats; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.UUID; import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath; import static io.trino.hdfs.FileSystemUtils.getRawFileSystem; @@ -48,6 +52,8 @@ class HdfsFileSystem private final HdfsContext context; private final TrinoHdfsFileSystemStats stats; + private final Map hierarchicalFileSystemCache = new IdentityHashMap<>(); + public HdfsFileSystem(HdfsEnvironment environment, HdfsContext context, TrinoHdfsFileSystemStats stats) { this.environment = requireNonNull(environment, "environment is null"); @@ -188,4 +194,54 @@ public FileIterator listFiles(Location location) } }); } + + @Override + public Optional directoryExists(Location location) + throws IOException + { + stats.getDirectoryExistsCalls().newCall(); + Path directory = hadoopPath(location); + FileSystem fileSystem = environment.getFileSystem(context, directory); + + return environment.doAs(context.getIdentity(), () -> { + if (!hierarchical(fileSystem, location)) { + return Optional.empty(); + } + + try (TimeStat.BlockTimer ignored = stats.getDirectoryExistsCalls().time()) { + FileStatus fileStatus = fileSystem.getFileStatus(directory); + return Optional.of(fileStatus.isDirectory()); + } + catch (FileNotFoundException e) { + return Optional.of(false); + } + catch (IOException e) { + stats.getListFilesCalls().recordException(e); + throw e; + } + }); + } + + private boolean hierarchical(FileSystem fileSystem, Location rootLocation) + { + Boolean cachedResult = hierarchicalFileSystemCache.get(fileSystem); + if (cachedResult != null) { + return cachedResult; + } + + // Hierarchical file systems will fail to list directories which do not exist. + // Object store file systems like S3 will allow these kinds of operations. + // Attempt to list a path which does not exist to know which one we have. + try { + fileSystem.listStatus(hadoopPath(rootLocation.appendPath(UUID.randomUUID().toString()))); + hierarchicalFileSystemCache.putIfAbsent(fileSystem, false); + return false; + } + catch (IOException e) { + // Being overly broad to avoid throwing an exception with the random UUID path in it. + // Instead, defer to later calls to fail with a more appropriate message. + hierarchicalFileSystemCache.putIfAbsent(fileSystem, true); + return true; + } + } } diff --git a/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoHdfsFileSystemStats.java b/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoHdfsFileSystemStats.java index 454b8e9b6927..ce4dba1a9f70 100644 --- a/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoHdfsFileSystemStats.java +++ b/lib/trino-hdfs/src/main/java/io/trino/hdfs/TrinoHdfsFileSystemStats.java @@ -24,6 +24,7 @@ public final class TrinoHdfsFileSystemStats private final CallStats renameFileCalls = new CallStats(); private final CallStats deleteFileCalls = new CallStats(); private final CallStats deleteDirectoryCalls = new CallStats(); + private final CallStats directoryExistsCalls = new CallStats(); @Managed @Nested @@ -66,4 +67,11 @@ public CallStats getDeleteDirectoryCalls() { return deleteDirectoryCalls; } + + @Managed + @Nested + public CallStats getDirectoryExistsCalls() + { + return directoryExistsCalls; + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java index 64186c5880f1..8ed4e5b19c09 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java @@ -110,6 +110,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_BUCKET_FILES; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE; @@ -546,21 +547,28 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) return getTransactionalSplits(Location.of(path.toString()), splittable, bucketConversion, splitFactory); } + TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); + Location location = Location.of(path.toString()); // Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping if (tableBucketInfo.isPresent()) { - List files = listBucketFiles(path, fs, splitFactory.getPartitionName()); + List files = listBucketFiles(trinoFileSystem, location, splitFactory.getPartitionName()); return hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, Optional.empty())); } - fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable, Optional.empty())); + Iterator splitIterator = createInternalHiveSplitIterator(trinoFileSystem, location, splitFactory, splittable, Optional.empty()); + fileIterators.addLast(splitIterator); return COMPLETED_FUTURE; } - private List listBucketFiles(Path path, FileSystem fs, String partitionName) + private List listBucketFiles(TrinoFileSystem fs, Location location, String partitionName) { + if (!ignoreAbsentPartitions) { + checkPartitionLocationExists(fs, location); + } + try { - return ImmutableList.copyOf(new HiveFileIterator(table, path, fs, directoryLister, hdfsNamenodeStats, FAIL, ignoreAbsentPartitions)); + return ImmutableList.copyOf(new HiveFileIterator(table, location, fs, directoryLister, hdfsNamenodeStats, FAIL)); } catch (HiveFileIterator.NestedDirectoryNotAllowedException e) { // Fail here to be on the safe side. This seems to be the same as what Hive does @@ -640,9 +648,12 @@ Optional> buildManifestFileIterator( throws IOException { FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, parent); + TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session); + Location location = Location.of(parent.toString()); + checkPartitionLocationExists(trinoFileSystem, location); Map fileStatuses = new HashMap<>(); - HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, hdfsNamenodeStats, IGNORED, false); + HiveFileIterator fileStatusIterator = new HiveFileIterator(table, location, trinoFileSystem, directoryLister, hdfsNamenodeStats, IGNORED); fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(new Path(status.getPath())), status)); List locatedFileStatuses = new ArrayList<>(); @@ -801,12 +812,28 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat inpu .anyMatch(name -> name.equals("UseFileSplitsFromInputFormat")); } - private Iterator createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo) + private Iterator createInternalHiveSplitIterator(TrinoFileSystem fileSystem, Location location, InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo) { - Iterator iterator = new HiveFileIterator(table, path, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED, ignoreAbsentPartitions); + if (!ignoreAbsentPartitions) { + checkPartitionLocationExists(fileSystem, location); + } + + Iterator iterator = new HiveFileIterator(table, location, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED); return createInternalHiveSplitIterator(splitFactory, splittable, acidInfo, Streams.stream(iterator)); } + private static void checkPartitionLocationExists(TrinoFileSystem fileSystem, Location location) + { + try { + if (!fileSystem.directoryExists(location).orElse(true)) { + throw new TrinoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + location); + } + } + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed checking directory path:" + location, e); + } + } + private static Iterator createInternalHiveSplitIterator(InternalHiveSplitFactory splitFactory, boolean splittable, Optional acidInfo, Stream fileStream) { return fileStream diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java index b2928cffaa73..bbdfe317595f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/CachingDirectoryLister.java @@ -20,15 +20,14 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.collect.cache.EvictableCacheBuilder; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.weakref.jmx.Managed; import javax.inject.Inject; @@ -53,7 +52,7 @@ public class CachingDirectoryLister { //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys // to deal more efficiently with cache invalidation scenarios for partitioned tables. - private final Cache cache; + private final Cache cache; private final List tablePrefixes; @Inject @@ -66,7 +65,7 @@ public CachingDirectoryLister(Duration expireAfterWrite, DataSize maxSize, List< { this.cache = EvictableCacheBuilder.newBuilder() .maximumWeight(maxSize.toBytes()) - .weigher((Weigher) (key, value) -> toIntExact(key.getRetainedSizeInBytes() + value.getRetainedSizeInBytes())) + .weigher((Weigher) (key, value) -> toIntExact(estimatedSizeOf(key.toString()) + value.getRetainedSizeInBytes())) .expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS) .shareNothingWhenDisabled() .recordStats() @@ -92,45 +91,31 @@ private static SchemaTablePrefix parseTableName(String tableName) } @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException { if (!isCacheEnabledFor(table.getSchemaTableName())) { - return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(path)); + return new TrinoFileStatusRemoteIterator(fs.listFiles(location)); } - return listInternal(fs, new DirectoryListingCacheKey(path.toString(), false)); + return listInternal(fs, location); } - @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) + private RemoteIterator listInternal(TrinoFileSystem fs, Location location) throws IOException { - if (!isCacheEnabledFor(table.getSchemaTableName())) { - return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); - } - - return listInternal(fs, new DirectoryListingCacheKey(path.toString(), true)); - } - - private RemoteIterator listInternal(FileSystem fs, DirectoryListingCacheKey cacheKey) - throws IOException - { - ValueHolder cachedValueHolder = uncheckedCacheGet(cache, cacheKey, ValueHolder::new); + ValueHolder cachedValueHolder = uncheckedCacheGet(cache, location, ValueHolder::new); if (cachedValueHolder.getFiles().isPresent()) { return new SimpleRemoteIterator(cachedValueHolder.getFiles().get().iterator()); } - return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, cacheKey), cacheKey); + return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, location), location); } - private static RemoteIterator createListingRemoteIterator(FileSystem fs, DirectoryListingCacheKey cacheKey) + private static RemoteIterator createListingRemoteIterator(TrinoFileSystem fs, Location location) throws IOException { - if (cacheKey.isRecursiveFilesOnly()) { - return new TrinoFileStatusRemoteIterator(fs.listFiles(new Path(cacheKey.getPath()), true)); - } - return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(new Path(cacheKey.getPath()))); + return new TrinoFileStatusRemoteIterator(fs.listFiles(location)); } @Override @@ -138,7 +123,7 @@ public void invalidate(Table table) { if (isCacheEnabledFor(table.getSchemaTableName()) && isLocationPresent(table.getStorage())) { if (table.getPartitionColumns().isEmpty()) { - cache.invalidateAll(DirectoryListingCacheKey.allKeysWithPath(new Path(table.getStorage().getLocation()))); + cache.invalidate(Location.of(table.getStorage().getLocation())); } else { // a partitioned table can have multiple paths in cache @@ -151,11 +136,11 @@ public void invalidate(Table table) public void invalidate(Partition partition) { if (isCacheEnabledFor(partition.getSchemaTableName()) && isLocationPresent(partition.getStorage())) { - cache.invalidateAll(DirectoryListingCacheKey.allKeysWithPath(new Path(partition.getStorage().getLocation()))); + cache.invalidate(Location.of(partition.getStorage().getLocation())); } } - private RemoteIterator cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator iterator, DirectoryListingCacheKey key) + private RemoteIterator cachingRemoteIterator(ValueHolder cachedValueHolder, RemoteIterator iterator, Location location) { return new RemoteIterator<>() { @@ -169,7 +154,7 @@ public boolean hasNext() if (!hasNext) { // The cachedValueHolder acts as an invalidation guard. If a cache invalidation happens while this iterator goes over // the files from the specified path, the eventually outdated file listing will not be added anymore to the cache. - cache.asMap().replace(key, cachedValueHolder, new ValueHolder(files)); + cache.asMap().replace(location, cachedValueHolder, new ValueHolder(files)); } return hasNext; } @@ -222,15 +207,9 @@ public long getRequestCount() } @VisibleForTesting - boolean isCached(Path path) - { - return isCached(new DirectoryListingCacheKey(path.toString(), false)); - } - - @VisibleForTesting - boolean isCached(DirectoryListingCacheKey cacheKey) + boolean isCached(Location location) { - ValueHolder cached = cache.getIfPresent(cacheKey); + ValueHolder cached = cache.getIfPresent(location); return cached != null && cached.getFiles().isPresent(); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java index 7983cdf1580a..def1b624a39b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java @@ -13,20 +13,16 @@ */ package io.trino.plugin.hive.fs; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import java.io.IOException; public interface DirectoryLister extends TableInvalidationCallback { - RemoteIterator list(FileSystem fs, Table table, Path path) - throws IOException; - - RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) + RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingCacheKey.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingCacheKey.java deleted file mode 100644 index be6bcef7b251..000000000000 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingCacheKey.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.hive.fs; - -import com.google.common.collect.ImmutableList; -import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.util.List; -import java.util.Objects; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static io.airlift.slice.SizeOf.estimatedSizeOf; -import static io.airlift.slice.SizeOf.instanceSize; -import static java.util.Objects.requireNonNull; - -/** - * A cache key designed for use in {@link CachingDirectoryLister} and {@link TransactionScopeCachingDirectoryLister} - * that allows distinct cache entries to be created for both recursive, files-only listings and shallow listings - * (that also might contain directories) at the same {@link Path}, ie: {@link DirectoryLister#list(FileSystem, Table, Path)} - * and {@link DirectoryLister#listFilesRecursively(FileSystem, Table, Path)} results. - */ -final class DirectoryListingCacheKey -{ - private static final long INSTANCE_SIZE = instanceSize(DirectoryListingCacheKey.class); - - private final String path; - private final int hashCode; // precomputed hashCode - private final boolean recursiveFilesOnly; - - public DirectoryListingCacheKey(String path, boolean recursiveFilesOnly) - { - this.path = requireNonNull(path, "path is null"); - this.recursiveFilesOnly = recursiveFilesOnly; - this.hashCode = Objects.hash(path, recursiveFilesOnly); - } - - public String getPath() - { - return path; - } - - public boolean isRecursiveFilesOnly() - { - return recursiveFilesOnly; - } - - public long getRetainedSizeInBytes() - { - return INSTANCE_SIZE + estimatedSizeOf(path); - } - - @Override - public int hashCode() - { - return hashCode; - } - - @Override - public boolean equals(Object o) - { - if (o == null || (o.getClass() != this.getClass())) { - return false; - } - DirectoryListingCacheKey other = (DirectoryListingCacheKey) o; - return recursiveFilesOnly == other.recursiveFilesOnly - && hashCode == other.hashCode - && path.equals(other.path); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("path", path) - .add("isRecursiveFilesOnly", recursiveFilesOnly) - .toString(); - } - - public static List allKeysWithPath(Path path) - { - return ImmutableList.of(new DirectoryListingCacheKey(path.toString(), true), new DirectoryListingCacheKey(path.toString(), false)); - } -} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java new file mode 100644 index 000000000000..07efcea578f5 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryListingFilter.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import io.trino.filesystem.Location; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +import static java.util.Objects.requireNonNull; + +/** + * Filters down the full listing of a path prefix to just the files directly in a given directory. + */ +public class DirectoryListingFilter + implements RemoteIterator +{ + private final Location prefix; + private final RemoteIterator delegateIterator; + private final boolean failOnUnexpectedFiles; + + @Nullable private TrinoFileStatus nextElement; + + public DirectoryListingFilter(Location prefix, RemoteIterator delegateIterator, boolean failOnUnexpectedFiles) + throws IOException + { + this.prefix = requireNonNull(prefix, "prefix is null"); + this.delegateIterator = requireNonNull(delegateIterator, "delegateIterator is null"); + this.nextElement = findNextElement(); + this.failOnUnexpectedFiles = failOnUnexpectedFiles; + } + + @Override + public boolean hasNext() + throws IOException + { + return nextElement != null; + } + + @Override + public TrinoFileStatus next() + throws IOException + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + TrinoFileStatus thisElement = nextElement; + this.nextElement = findNextElement(); + return thisElement; + } + + private TrinoFileStatus findNextElement() + throws IOException + { + while (delegateIterator.hasNext()) { + TrinoFileStatus candidate = delegateIterator.next(); + Location parent = Location.of(candidate.getPath()).parentDirectory(); + boolean directChild = parent.equals(prefix); + + if (!directChild && failOnUnexpectedFiles && !parentIsHidden(parent, prefix)) { + throw new HiveFileIterator.NestedDirectoryNotAllowedException(candidate.getPath()); + } + + if (directChild) { + return candidate; + } + } + return null; + } + + private static boolean parentIsHidden(Location location, Location prefix) + { + if (location.equals(prefix)) { + return false; + } + + if (location.fileName().startsWith(".") || location.fileName().startsWith("_")) { + return true; + } + + return parentIsHidden(location.parentDirectory(), prefix); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java index fdce442f1567..e96d3eadbb87 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java @@ -16,12 +16,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.AbstractIterator; import io.airlift.stats.TimeStat; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.hdfs.HdfsNamenodeStats; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.TrinoException; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import java.io.FileNotFoundException; import java.io.IOException; @@ -30,8 +30,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; +import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.FAIL; import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.RECURSE; -import static java.util.Collections.emptyIterator; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.Path.SEPARATOR_CHAR; @@ -47,30 +47,27 @@ public enum NestedDirectoryPolicy private final String pathPrefix; private final Table table; - private final FileSystem fileSystem; + private final TrinoFileSystem fileSystem; private final DirectoryLister directoryLister; private final HdfsNamenodeStats namenodeStats; private final NestedDirectoryPolicy nestedDirectoryPolicy; - private final boolean ignoreAbsentPartitions; private final Iterator remoteIterator; public HiveFileIterator( Table table, - Path path, - FileSystem fileSystem, + Location location, + TrinoFileSystem fileSystem, DirectoryLister directoryLister, HdfsNamenodeStats namenodeStats, - NestedDirectoryPolicy nestedDirectoryPolicy, - boolean ignoreAbsentPartitions) + NestedDirectoryPolicy nestedDirectoryPolicy) { - this.pathPrefix = path.toUri().getPath(); + this.pathPrefix = location.toString(); this.table = requireNonNull(table, "table is null"); this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.directoryLister = requireNonNull(directoryLister, "directoryLister is null"); this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null"); this.nestedDirectoryPolicy = requireNonNull(nestedDirectoryPolicy, "nestedDirectoryPolicy is null"); - this.ignoreAbsentPartitions = ignoreAbsentPartitions; - this.remoteIterator = getLocatedFileStatusRemoteIterator(path); + this.remoteIterator = getLocatedFileStatusRemoteIterator(location); } @Override @@ -90,42 +87,19 @@ else if (isHiddenFileOrDirectory(new Path(status.getPath()))) { continue; } - if (status.isDirectory()) { - switch (nestedDirectoryPolicy) { - case IGNORED: - continue; - case RECURSE: - // Recursive listings call listFiles which should not return directories, this is a contract violation - // and can be handled the same way as the FAIL case - case FAIL: - throw new NestedDirectoryNotAllowedException(status.getPath()); - } - } return status; } return endOfData(); } - private Iterator getLocatedFileStatusRemoteIterator(Path path) + private Iterator getLocatedFileStatusRemoteIterator(Location location) { try (TimeStat.BlockTimer ignored = namenodeStats.getListLocatedStatus().time()) { - return new FileStatusIterator(table, path, fileSystem, directoryLister, namenodeStats, nestedDirectoryPolicy == RECURSE); + return new FileStatusIterator(table, location, fileSystem, directoryLister, namenodeStats, nestedDirectoryPolicy); } - catch (TrinoException e) { - if (ignoreAbsentPartitions) { - try { - if (!fileSystem.exists(path)) { - return emptyIterator(); - } - } - catch (Exception ee) { - TrinoException trinoException = new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed to check if path exists: " + path, ee); - trinoException.addSuppressed(e); - throw trinoException; - } - } - throw e; + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Filed to list files for location: " + location, e); } } @@ -148,7 +122,7 @@ static boolean isHiddenFileOrDirectory(Path path) @VisibleForTesting static boolean isHiddenOrWithinHiddenParentDirectory(Path path, String prefix) { - String pathString = path.toUri().getPath(); + String pathString = path.toUri().toString(); checkArgument(pathString.startsWith(prefix), "path %s does not start with prefix %s", pathString, prefix); return containsHiddenPathPartAfterIndex(pathString, prefix.endsWith("/") ? prefix.length() : prefix.length() + 1); } @@ -174,20 +148,30 @@ static boolean containsHiddenPathPartAfterIndex(String pathString, int startFrom private static class FileStatusIterator implements Iterator { - private final Path path; + private final Location location; private final HdfsNamenodeStats namenodeStats; private final RemoteIterator fileStatusIterator; - private FileStatusIterator(Table table, Path path, FileSystem fileSystem, DirectoryLister directoryLister, HdfsNamenodeStats namenodeStats, boolean recursive) + private FileStatusIterator( + Table table, + Location location, + TrinoFileSystem fileSystem, + DirectoryLister directoryLister, + HdfsNamenodeStats namenodeStats, + NestedDirectoryPolicy nestedDirectoryPolicy) + throws IOException { - this.path = path; + this.location = location; this.namenodeStats = namenodeStats; try { - if (recursive) { - this.fileStatusIterator = directoryLister.listFilesRecursively(fileSystem, table, path); + if (nestedDirectoryPolicy == RECURSE) { + this.fileStatusIterator = directoryLister.listFilesRecursively(fileSystem, table, location); } else { - this.fileStatusIterator = directoryLister.list(fileSystem, table, path); + this.fileStatusIterator = new DirectoryListingFilter( + location, + directoryLister.listFilesRecursively(fileSystem, table, location), + nestedDirectoryPolicy == FAIL); } } catch (IOException e) { @@ -221,9 +205,9 @@ private TrinoException processException(IOException exception) { namenodeStats.getRemoteIteratorNext().recordException(exception); if (exception instanceof FileNotFoundException) { - return new TrinoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + path); + return new TrinoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + location); } - return new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed to list directory: " + path, exception); + return new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed to list directory: " + location, exception); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java new file mode 100644 index 000000000000..dcbc296f6046 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.fs; + +import java.io.IOException; + +public interface RemoteIterator +{ + boolean hasNext() + throws IOException; + + T next() + throws IOException; +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java index 6277d5b98ad0..6c181f38432e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java @@ -13,8 +13,6 @@ */ package io.trino.plugin.hive.fs; -import org.apache.hadoop.fs.RemoteIterator; - import java.io.IOException; import java.util.Iterator; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java index bd48d7a44400..d18c6fcf50f0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java @@ -13,9 +13,12 @@ */ package io.trino.plugin.hive.fs; +import io.trino.filesystem.Location; + import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; import static java.util.Objects.requireNonNull; @@ -24,22 +27,22 @@ public class TransactionDirectoryListingCacheKey private static final long INSTANCE_SIZE = instanceSize(TransactionDirectoryListingCacheKey.class); private final long transactionId; - private final DirectoryListingCacheKey key; + private final Location path; - public TransactionDirectoryListingCacheKey(long transactionId, DirectoryListingCacheKey key) + public TransactionDirectoryListingCacheKey(long transactionId, Location path) { this.transactionId = transactionId; - this.key = requireNonNull(key, "key is null"); + this.path = requireNonNull(path, "path is null"); } - public DirectoryListingCacheKey getKey() + public Location getPath() { - return key; + return path; } public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + key.getRetainedSizeInBytes(); + return INSTANCE_SIZE + estimatedSizeOf(path.toString()); } @Override @@ -52,13 +55,13 @@ public boolean equals(Object o) return false; } TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o; - return transactionId == that.transactionId && key.equals(that.key); + return transactionId == that.transactionId && path.equals(that.path); } @Override public int hashCode() { - return Objects.hash(transactionId, key); + return Objects.hash(transactionId, path); } @Override @@ -66,7 +69,7 @@ public String toString() { return toStringHelper(this) .add("transactionId", transactionId) - .add("key", key) + .add("path", path) .toString(); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java index 848ae2cf67ad..3dd7ba700aef 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java @@ -16,12 +16,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.util.concurrent.UncheckedExecutionException; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -38,10 +37,8 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.base.Throwables.throwIfUnchecked; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.SizeOf.instanceSize; import static io.airlift.slice.SizeOf.sizeOfObjectArray; -import static io.trino.plugin.hive.fs.DirectoryListingCacheKey.allKeysWithPath; import static java.util.Collections.synchronizedList; import static java.util.Objects.requireNonNull; @@ -68,20 +65,13 @@ public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long tra } @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException { - return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), false))); + return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, location)); } - @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) - throws IOException - { - return listInternal(fs, table, new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), true))); - } - - private RemoteIterator listInternal(FileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) + private RemoteIterator listInternal(TrinoFileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) throws IOException { FetchingValueHolder cachedValueHolder; @@ -92,7 +82,7 @@ private RemoteIterator listInternal(FileSystem fs, Table table, Throwable throwable = e.getCause(); throwIfInstanceOf(throwable, IOException.class); throwIfUnchecked(throwable); - throw new RuntimeException("Failed to list directory: " + cacheKey.getKey().getPath(), throwable); + throw new RuntimeException("Failed to list directory: " + cacheKey.getPath(), throwable); } if (cachedValueHolder.isFullyCached()) { @@ -102,13 +92,10 @@ private RemoteIterator listInternal(FileSystem fs, Table table, return cachingRemoteIterator(cachedValueHolder, cacheKey); } - private RemoteIterator createListingRemoteIterator(FileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) + private RemoteIterator createListingRemoteIterator(TrinoFileSystem fs, Table table, TransactionDirectoryListingCacheKey cacheKey) throws IOException { - if (cacheKey.getKey().isRecursiveFilesOnly()) { - return delegate.listFilesRecursively(fs, table, new Path(cacheKey.getKey().getPath())); - } - return delegate.list(fs, table, new Path(cacheKey.getKey().getPath())); + return delegate.listFilesRecursively(fs, table, cacheKey.getPath()); } @Override @@ -116,9 +103,7 @@ public void invalidate(Table table) { if (isLocationPresent(table.getStorage())) { if (table.getPartitionColumns().isEmpty()) { - cache.invalidateAll(allKeysWithPath(new Path(table.getStorage().getLocation())).stream() - .map(key -> new TransactionDirectoryListingCacheKey(transactionId, key)) - .collect(toImmutableList())); + cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, Location.of(table.getStorage().getLocation()))); } else { // a partitioned table can have multiple paths in cache @@ -132,9 +117,7 @@ public void invalidate(Table table) public void invalidate(Partition partition) { if (isLocationPresent(partition.getStorage())) { - cache.invalidateAll(allKeysWithPath(new Path(partition.getStorage().getLocation())).stream() - .map(key -> new TransactionDirectoryListingCacheKey(transactionId, key)) - .collect(toImmutableList())); + cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, Location.of(partition.getStorage().getLocation()))); } delegate.invalidate(partition); } @@ -176,9 +159,9 @@ public TrinoFileStatus next() } @VisibleForTesting - boolean isCached(Path path) + boolean isCached(Location location) { - return isCached(new TransactionDirectoryListingCacheKey(transactionId, new DirectoryListingCacheKey(path.toString(), false))); + return isCached(new TransactionDirectoryListingCacheKey(transactionId, location)); } @VisibleForTesting diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java index 1bebf9a66f95..51f2a22fb901 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java @@ -79,11 +79,6 @@ public String getPath() return path; } - public boolean isDirectory() - { - return isDirectory; - } - public long getLength() { return length; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatusRemoteIterator.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatusRemoteIterator.java index e6793698740a..730895748d7d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatusRemoteIterator.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatusRemoteIterator.java @@ -13,8 +13,7 @@ */ package io.trino.plugin.hive.fs; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.RemoteIterator; +import io.trino.filesystem.FileIterator; import java.io.IOException; @@ -23,9 +22,9 @@ public class TrinoFileStatusRemoteIterator implements RemoteIterator { - private final RemoteIterator iterator; + private final FileIterator iterator; - public TrinoFileStatusRemoteIterator(RemoteIterator iterator) + public TrinoFileStatusRemoteIterator(FileIterator iterator) { this.iterator = requireNonNull(iterator, "iterator is null"); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index d432c042a63d..9a7166bec7e2 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -26,6 +26,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsConfig; import io.trino.hdfs.HdfsContext; @@ -38,6 +39,7 @@ import io.trino.plugin.hive.LocationService.WriteInfo; import io.trino.plugin.hive.aws.athena.PartitionProjectionService; import io.trino.plugin.hive.fs.DirectoryLister; +import io.trino.plugin.hive.fs.RemoteIterator; import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory; import io.trino.plugin.hive.fs.TrinoFileStatus; import io.trino.plugin.hive.fs.TrinoFileStatusRemoteIterator; @@ -142,7 +144,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.metastore.TableType; import org.joda.time.DateTime; import org.testng.annotations.AfterClass; @@ -6331,19 +6332,11 @@ private static class CountingDirectoryLister private final AtomicInteger listCount = new AtomicInteger(); @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException { listCount.incrementAndGet(); - return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(path)); - } - - @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) - throws IOException - { - listCount.incrementAndGet(); - return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); + return new TrinoFileStatusRemoteIterator(fs.listFiles(location)); } public int getListCount() diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 4a310d31be61..2d6011c376d4 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -23,12 +23,14 @@ import io.airlift.slice.Slice; import io.airlift.stats.CounterStat; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsConfig; import io.trino.hdfs.HdfsConfiguration; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.HdfsNamenodeStats; +import io.trino.hdfs.TrinoHdfsFileSystemStats; import io.trino.hdfs.authentication.NoHdfsAuthentication; import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.plugin.base.CatalogName; @@ -106,6 +108,7 @@ import static io.trino.plugin.hive.AbstractTestHive.getSplits; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; +import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders; @@ -449,6 +452,7 @@ public void testFileIteratorListing() // base-path-file.txt Path basePath = new Path(getBasePath(), "test-file-iterator-listing"); FileSystem fs = hdfsEnvironment.getFileSystem(TESTING_CONTEXT, basePath); + TrinoFileSystem trinoFileSystem = new HdfsFileSystemFactory(hdfsEnvironment, new TrinoHdfsFileSystemStats()).create(SESSION); fs.mkdirs(basePath); // create file in hidden folder @@ -473,12 +477,11 @@ public void testFileIteratorListing() // List recursively through hive file iterator HiveFileIterator recursiveIterator = new HiveFileIterator( fakeTable, - basePath, - fs, + Location.of(basePath.toString()), + trinoFileSystem, new FileSystemDirectoryLister(), new HdfsNamenodeStats(), - HiveFileIterator.NestedDirectoryPolicy.RECURSE, - false); // ignoreAbsentPartitions + HiveFileIterator.NestedDirectoryPolicy.RECURSE); List recursiveListing = Streams.stream(recursiveIterator) .map(TrinoFileStatus::getPath) @@ -489,12 +492,11 @@ public void testFileIteratorListing() HiveFileIterator shallowIterator = new HiveFileIterator( fakeTable, - basePath, - fs, + Location.of(basePath.toString()), + trinoFileSystem, new FileSystemDirectoryLister(), new HdfsNamenodeStats(), - HiveFileIterator.NestedDirectoryPolicy.IGNORED, - false); // ignoreAbsentPartitions + HiveFileIterator.NestedDirectoryPolicy.IGNORED); List shallowListing = Streams.stream(shallowIterator) .map(TrinoFileStatus::getPath) .map(Path::new) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index 4fb53a783a39..20b428b88e14 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -912,11 +912,11 @@ public void testBuildManifestFileIteratorNestedDirectory() schema.setProperty(SERIALIZATION_LIB, AVRO.getSerde()); Path filePath = new Path("hdfs://VOL1:9000/db_name/table_name/file1"); - Path directoryPath = new Path("hdfs://VOL1:9000/db_name/table_name/dir"); + Path directoryPath = new Path("hdfs://VOL1:9000/db_name/table_name/dir/file2"); List paths = ImmutableList.of(filePath, directoryPath); List files = ImmutableList.of( locatedFileStatus(filePath), - locatedDirectoryStatus(directoryPath)); + locatedFileStatus(directoryPath)); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( files, @@ -1412,23 +1412,6 @@ private static LocatedFileStatus locatedFileStatusWithNoBlocks(Path path) new BlockLocation[] {}); } - private static LocatedFileStatus locatedDirectoryStatus(Path path) - { - return new LocatedFileStatus( - 0L, - true, - 0, - 0L, - 0L, - 0L, - null, - null, - null, - null, - path, - new BlockLocation[] {}); - } - public static class TestingHdfsEnvironment extends HdfsEnvironment { @@ -1483,7 +1466,18 @@ public void setWorkingDirectory(Path dir) @Override public FileStatus[] listStatus(Path f) { - throw new UnsupportedOperationException(); + FileStatus[] fileStatuses = new FileStatus[files.size()]; + for (int i = 0; i < files.size(); i++) { + LocatedFileStatus locatedFileStatus = files.get(i); + fileStatuses[i] = new FileStatus( + locatedFileStatus.getLen(), + locatedFileStatus.isDirectory(), + locatedFileStatus.getReplication(), + locatedFileStatus.getBlockSize(), + locatedFileStatus.getModificationTime(), + locatedFileStatus.getPath()); + } + return fileStatuses; } @Override @@ -1547,13 +1541,13 @@ public FileStatus getFileStatus(Path f) @Override public Path getWorkingDirectory() { - throw new UnsupportedOperationException(); + return new Path(getUri()); } @Override public URI getUri() { - throw new UnsupportedOperationException(); + return URI.create("hdfs://VOL1:9000/"); } } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java index 4965a601fdff..ee26f071ecc7 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/BaseCachingDirectoryListerTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.Location; import io.trino.plugin.hive.HiveQueryRunner; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.Table; @@ -66,7 +67,7 @@ protected QueryRunner createQueryRunner(Map properties) protected abstract C createDirectoryLister(); - protected abstract boolean isCached(C directoryLister, org.apache.hadoop.fs.Path path); + protected abstract boolean isCached(C directoryLister, Location location); @Test public void testCacheInvalidationIsAppliedSpecificallyOnTheNonPartitionedTableBeingChanged() @@ -75,14 +76,14 @@ public void testCacheInvalidationIsAppliedSpecificallyOnTheNonPartitionedTableBe assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (1), (2), (3)", 3); // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table1", "VALUES (6)"); - org.apache.hadoop.fs.Path cachedTable1Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table1"); + String cachedTable1Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table1"); assertThat(isCached(cachedTable1Location)).isTrue(); assertUpdate("CREATE TABLE partial_cache_invalidation_table2 (col1 int) WITH (format = 'ORC')"); assertUpdate("INSERT INTO partial_cache_invalidation_table2 VALUES (11), (12)", 2); // The listing for the invalidate_non_partitioned_table2 should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM partial_cache_invalidation_table2", "VALUES (23)"); - org.apache.hadoop.fs.Path cachedTable2Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table2"); + String cachedTable2Location = getTableLocation(TPCH_SCHEMA, "partial_cache_invalidation_table2"); assertThat(isCached(cachedTable2Location)).isTrue(); assertUpdate("INSERT INTO partial_cache_invalidation_table1 VALUES (4), (5)", 2); @@ -104,14 +105,14 @@ public void testCacheInvalidationIsAppliedOnTheEntireCacheOnPartitionedTableDrop assertUpdate("INSERT INTO full_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM full_cache_invalidation_non_partitioned_table", "VALUES (6)"); - org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "full_cache_invalidation_non_partitioned_table"); + String nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "full_cache_invalidation_non_partitioned_table"); assertThat(isCached(nonPartitionedTableLocation)).isTrue(); assertUpdate("CREATE TABLE full_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); assertUpdate("INSERT INTO full_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); assertQuery("SELECT col2, sum(col1) FROM full_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group2")); + String partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group1")); + String partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "full_cache_invalidation_partitioned_table", ImmutableList.of("group2")); assertThat(isCached(partitionedTableGroup1PartitionLocation)).isTrue(); assertThat(isCached(partitionedTableGroup2PartitionLocation)).isTrue(); @@ -139,14 +140,14 @@ public void testCacheInvalidationIsAppliedSpecificallyOnPartitionDropped() assertUpdate("INSERT INTO partition_path_cache_invalidation_non_partitioned_table VALUES (1), (2), (3)", 3); // The listing for the invalidate_non_partitioned_table1 should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM partition_path_cache_invalidation_non_partitioned_table", "VALUES (6)"); - org.apache.hadoop.fs.Path nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_non_partitioned_table"); + String nonPartitionedTableLocation = getTableLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_non_partitioned_table"); assertThat(isCached(nonPartitionedTableLocation)).isTrue(); assertUpdate("CREATE TABLE partition_path_cache_invalidation_partitioned_table (col1 int, col2 varchar) WITH (format = 'ORC', partitioned_by = ARRAY['col2'])"); assertUpdate("INSERT INTO partition_path_cache_invalidation_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); assertQuery("SELECT col2, sum(col1) FROM partition_path_cache_invalidation_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group2")); + String partitionedTableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group1")); + String partitionedTableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "partition_path_cache_invalidation_partitioned_table", ImmutableList.of("group2")); assertThat(isCached(partitionedTableGroup1PartitionLocation)).isTrue(); assertThat(isCached(partitionedTableGroup2PartitionLocation)).isTrue(); @@ -186,8 +187,8 @@ public void testInsertIntoPartitionedTable() assertUpdate("INSERT INTO insert_into_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT col2, sum(col1) FROM insert_into_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group2")); + String tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group1")); + String tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "insert_into_partitioned_table", ImmutableList.of("group2")); assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); @@ -207,9 +208,9 @@ public void testDropPartition() assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT col2, sum(col1) FROM delete_from_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group2")); - org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group3")); + String tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group1")); + String tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group2")); + String tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("group3")); assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); assertUpdate("DELETE FROM delete_from_partitioned_table WHERE col2 = 'group1' OR col2 = 'group2'"); @@ -229,10 +230,10 @@ public void testDropMultiLevelPartition() assertUpdate("INSERT INTO delete_from_partitioned_table VALUES (1000, DATE '2022-02-01', 'US'), (2000, DATE '2022-02-01', 'US'), (4000, DATE '2022-02-02', 'US'), (1500, DATE '2022-02-01', 'AT'), (2500, DATE '2022-02-02', 'AT')", 5); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT day, country, sum(clicks) FROM delete_from_partitioned_table GROUP BY day, country", "VALUES (DATE '2022-02-01', 'US', 3000), (DATE '2022-02-02', 'US', 4000), (DATE '2022-02-01', 'AT', 1500), (DATE '2022-02-02', 'AT', 2500)"); - org.apache.hadoop.fs.Path table20220201UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "US")); - org.apache.hadoop.fs.Path table20220202UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "US")); - org.apache.hadoop.fs.Path table20220201AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "AT")); - org.apache.hadoop.fs.Path table20220202AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "AT")); + String table20220201UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "US")); + String table20220202UsPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "US")); + String table20220201AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-01", "AT")); + String table20220202AtPartitionLocation = getPartitionLocation(TPCH_SCHEMA, "delete_from_partitioned_table", ImmutableList.of("2022-02-02", "AT")); assertThat(isCached(table20220201UsPartitionLocation)).isTrue(); assertThat(isCached(table20220202UsPartitionLocation)).isTrue(); assertThat(isCached(table20220201AtPartitionLocation)).isTrue(); @@ -258,13 +259,13 @@ public void testUnregisterRegisterPartition() assertUpdate("INSERT INTO register_unregister_partition_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2')", 4); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT col2, sum(col1) FROM register_unregister_partition_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group2")); + String tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group1")); + String tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "register_unregister_partition_table", ImmutableList.of("group2")); assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); List paths = getQueryRunner().execute(getSession(), "SELECT \"$path\" FROM register_unregister_partition_table WHERE col2 = 'group1' LIMIT 1").toTestTypes().getMaterializedRows(); - String group1PartitionPath = new org.apache.hadoop.fs.Path((String) paths.get(0).getField(0)).getParent().toString(); + String group1PartitionPath = Location.of((String) paths.get(0).getField(0)).parentDirectory().toString(); assertUpdate(format("CALL system.unregister_partition('%s', '%s', ARRAY['col2'], ARRAY['group1'])", TPCH_SCHEMA, "register_unregister_partition_table")); // Unregistering the partition in the table should invalidate the cached listing of all the partitions belonging to the table. @@ -290,7 +291,7 @@ public void testRenameTable() assertUpdate("INSERT INTO table_to_be_renamed VALUES (1), (2), (3)", 3); // The listing for the table should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM table_to_be_renamed", "VALUES (6)"); - org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_renamed"); + String tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_renamed"); assertThat(isCached(tableLocation)).isTrue(); assertUpdate("ALTER TABLE table_to_be_renamed RENAME TO table_renamed"); // Altering the table should invalidate the cached listing of the files belonging to the table. @@ -306,7 +307,7 @@ public void testDropTable() assertUpdate("INSERT INTO table_to_be_dropped VALUES (1), (2), (3)", 3); // The listing for the table should be in the directory cache after this call assertQuery("SELECT sum(col1) FROM table_to_be_dropped", "VALUES (6)"); - org.apache.hadoop.fs.Path tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_dropped"); + String tableLocation = getTableLocation(TPCH_SCHEMA, "table_to_be_dropped"); assertThat(isCached(tableLocation)).isTrue(); assertUpdate("DROP TABLE table_to_be_dropped"); // Dropping the table should invalidate the cached listing of the files belonging to the table. @@ -320,9 +321,9 @@ public void testDropPartitionedTable() assertUpdate("INSERT INTO drop_partitioned_table VALUES (1, 'group1'), (2, 'group1'), (3, 'group2'), (4, 'group2'), (5, 'group3')", 5); // The listing for the table partitions should be in the directory cache after this call assertQuery("SELECT col2, sum(col1) FROM drop_partitioned_table GROUP BY col2", "VALUES ('group1', 3), ('group2', 7), ('group3', 5)"); - org.apache.hadoop.fs.Path tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group1")); - org.apache.hadoop.fs.Path tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group2")); - org.apache.hadoop.fs.Path tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group3")); + String tableGroup1PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group1")); + String tableGroup2PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group2")); + String tableGroup3PartitionLocation = getPartitionLocation(TPCH_SCHEMA, "drop_partitioned_table", ImmutableList.of("group3")); assertThat(isCached(tableGroup1PartitionLocation)).isTrue(); assertThat(isCached(tableGroup2PartitionLocation)).isTrue(); assertThat(isCached(tableGroup3PartitionLocation)).isTrue(); @@ -347,27 +348,25 @@ protected void dropTable(String schemaName, String tableName, boolean deleteData fileHiveMetastore.dropTable(schemaName, tableName, deleteData); } - protected org.apache.hadoop.fs.Path getTableLocation(String schemaName, String tableName) + protected String getTableLocation(String schemaName, String tableName) { return getTable(schemaName, tableName) .map(table -> table.getStorage().getLocation()) - .map(tableLocation -> new org.apache.hadoop.fs.Path(tableLocation)) .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); } - protected org.apache.hadoop.fs.Path getPartitionLocation(String schemaName, String tableName, List partitionValues) + protected String getPartitionLocation(String schemaName, String tableName, List partitionValues) { Table table = getTable(schemaName, tableName) .orElseThrow(() -> new NoSuchElementException(format("The table %s.%s could not be found", schemaName, tableName))); return fileHiveMetastore.getPartition(table, partitionValues) .map(partition -> partition.getStorage().getLocation()) - .map(partitionLocation -> new org.apache.hadoop.fs.Path(partitionLocation)) .orElseThrow(() -> new NoSuchElementException(format("The partition %s from the table %s.%s could not be found", partitionValues, schemaName, tableName))); } - protected boolean isCached(org.apache.hadoop.fs.Path path) + protected boolean isCached(String path) { - return isCached(directoryLister, path); + return isCached(directoryLister, Location.of(path)); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java index 1eeca57becb5..b89bf75fe7ab 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/FileSystemDirectoryLister.java @@ -13,11 +13,10 @@ */ package io.trino.plugin.hive.fs; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import java.io.IOException; @@ -25,17 +24,10 @@ public class FileSystemDirectoryLister implements DirectoryLister { @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) throws IOException { - return new TrinoFileStatusRemoteIterator(fs.listLocatedStatus(path)); - } - - @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) - throws IOException - { - return new TrinoFileStatusRemoteIterator(fs.listFiles(path, true)); + return new TrinoFileStatusRemoteIterator(fs.listFiles(location)); } @Override diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java index 4cc21ce7b920..3d37a7e05fe5 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryLister.java @@ -15,7 +15,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; -import org.apache.hadoop.fs.Path; +import io.trino.filesystem.Location; import org.testng.annotations.Test; import java.util.List; @@ -34,9 +34,9 @@ protected CachingDirectoryLister createDirectoryLister() } @Override - protected boolean isCached(CachingDirectoryLister directoryLister, Path path) + protected boolean isCached(CachingDirectoryLister directoryLister, Location location) { - return directoryLister.isCached(path); + return directoryLister.isCached(location); } @Test diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java index ea9a47f360ae..010b8330145c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestCachingDirectoryListerRecursiveFilesOnly.java @@ -17,10 +17,10 @@ import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.filesystem.Location; import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.Table; import io.trino.testing.QueryRunner; -import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; import java.util.List; @@ -54,9 +54,9 @@ protected QueryRunner createQueryRunner() } @Override - protected boolean isCached(CachingDirectoryLister directoryLister, Path path) + protected boolean isCached(CachingDirectoryLister directoryLister, Location location) { - return directoryLister.isCached(new DirectoryListingCacheKey(path.toString(), true)); + return directoryLister.isCached(location); } @Test @@ -80,7 +80,7 @@ public void testRecursiveDirectories() // Execute a query on the new table to pull the listing into the cache assertQuery("SELECT sum(clicks) FROM recursive_directories", "VALUES (11000)"); - Path tableLocation = getTableLocation(TPCH_SCHEMA, "recursive_directories"); + String tableLocation = getTableLocation(TPCH_SCHEMA, "recursive_directories"); assertTrue(isCached(tableLocation)); // Insert should invalidate cache, even at the root directory path diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java index ac196e577bf0..a67bb89bf514 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java @@ -16,6 +16,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.hive.HiveBucketProperty; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.metastore.Column; @@ -24,9 +26,6 @@ import io.trino.plugin.hive.metastore.Storage; import io.trino.plugin.hive.metastore.StorageFormat; import io.trino.plugin.hive.metastore.Table; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.testng.annotations.Test; import java.io.IOException; @@ -77,21 +76,21 @@ protected TransactionScopeCachingDirectoryLister createDirectoryLister() } @Override - protected boolean isCached(TransactionScopeCachingDirectoryLister directoryLister, Path path) + protected boolean isCached(TransactionScopeCachingDirectoryLister directoryLister, Location location) { - return directoryLister.isCached(path); + return directoryLister.isCached(location); } @Test public void testConcurrentDirectoryListing() throws IOException { - TrinoFileStatus firstFile = new TrinoFileStatus(ImmutableList.of(), "x", false, 1, 1); - TrinoFileStatus secondFile = new TrinoFileStatus(ImmutableList.of(), "y", false, 1, 1); - TrinoFileStatus thirdFile = new TrinoFileStatus(ImmutableList.of(), "z", false, 1, 1); + TrinoFileStatus firstFile = new TrinoFileStatus(ImmutableList.of(), "file:/x/x", false, 1, 1); + TrinoFileStatus secondFile = new TrinoFileStatus(ImmutableList.of(), "file:/x/y", false, 1, 1); + TrinoFileStatus thirdFile = new TrinoFileStatus(ImmutableList.of(), "file:/y/z", false, 1, 1); - Path path1 = new Path("x"); - Path path2 = new Path("y"); + Location path1 = Location.of("file:/x"); + Location path2 = Location.of("file:/y"); CountingDirectoryLister countingLister = new CountingDirectoryLister( ImmutableMap.of( @@ -102,17 +101,17 @@ public void testConcurrentDirectoryListing() // due to Token being a key in segmented cache. TransactionScopeCachingDirectoryLister cachingLister = (TransactionScopeCachingDirectoryLister) new TransactionScopeCachingDirectoryListerFactory(DataSize.ofBytes(500), Optional.of(1)).get(countingLister); - assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, (cachingLister.listFilesRecursively(null, TABLE, path2)), true), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(1); // listing path2 again shouldn't increase listing count assertThat(cachingLister.isCached(path2)).isTrue(); - assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2), true), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(1); // start listing path1 concurrently - RemoteIterator path1FilesA = cachingLister.list(null, TABLE, path1); - RemoteIterator path1FilesB = cachingLister.list(null, TABLE, path1); + RemoteIterator path1FilesA = new DirectoryListingFilter(path1, cachingLister.listFilesRecursively(null, TABLE, path1), true); + RemoteIterator path1FilesB = new DirectoryListingFilter(path1, cachingLister.listFilesRecursively(null, TABLE, path1), true); assertThat(countingLister.getListCount()).isEqualTo(2); // list path1 files using both iterators concurrently @@ -126,7 +125,7 @@ public void testConcurrentDirectoryListing() // listing path2 again should increase listing count because 2 files were cached for path1 assertThat(cachingLister.isCached(path2)).isFalse(); - assertFiles(cachingLister.list(null, TABLE, path2), ImmutableList.of(thirdFile)); + assertFiles(new DirectoryListingFilter(path2, cachingLister.listFilesRecursively(null, TABLE, path2), true), ImmutableList.of(thirdFile)); assertThat(countingLister.getListCount()).isEqualTo(3); } @@ -134,16 +133,16 @@ public void testConcurrentDirectoryListing() public void testConcurrentDirectoryListingException() throws IOException { - TrinoFileStatus file = new TrinoFileStatus(ImmutableList.of(), "x", false, 1, 1); - Path path = new Path("x"); + TrinoFileStatus file = new TrinoFileStatus(ImmutableList.of(), "file:/x/x", false, 1, 1); + Location path = Location.of("file:/x"); CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file))); DirectoryLister cachingLister = new TransactionScopeCachingDirectoryListerFactory(DataSize.ofBytes(600), Optional.empty()).get(countingLister); // start listing path concurrently countingLister.setThrowException(true); - RemoteIterator filesA = cachingLister.list(null, TABLE, path); - RemoteIterator filesB = cachingLister.list(null, TABLE, path); + RemoteIterator filesA = cachingLister.listFilesRecursively(null, TABLE, path); + RemoteIterator filesB = cachingLister.listFilesRecursively(null, TABLE, path); assertThat(countingLister.getListCount()).isEqualTo(1); // listing should throw an exception @@ -151,7 +150,7 @@ public void testConcurrentDirectoryListingException() // listing again should succeed countingLister.setThrowException(false); - assertFiles(cachingLister.list(null, TABLE, path), ImmutableList.of(file)); + assertFiles(new DirectoryListingFilter(path, cachingLister.listFilesRecursively(null, TABLE, path), true), ImmutableList.of(file)); assertThat(countingLister.getListCount()).isEqualTo(2); // listing using second concurrently initialized DirectoryLister should fail @@ -171,29 +170,21 @@ private void assertFiles(RemoteIterator iterator, List> fileStatuses; + private final Map> fileStatuses; private int listCount; private boolean throwException; - public CountingDirectoryLister(Map> fileStatuses) + public CountingDirectoryLister(Map> fileStatuses) { this.fileStatuses = requireNonNull(fileStatuses, "fileStatuses is null"); } @Override - public RemoteIterator list(FileSystem fs, Table table, Path path) - throws IOException - { - listCount++; - return throwingRemoteIterator(requireNonNull(fileStatuses.get(path)), throwException); - } - - @Override - public RemoteIterator listFilesRecursively(FileSystem fs, Table table, Path path) - throws IOException + public RemoteIterator listFilesRecursively(TrinoFileSystem fs, Table table, Location location) { // No specific recursive files-only listing implementation - return list(fs, table, path); + listCount++; + return throwingRemoteIterator(requireNonNull(fileStatuses.get(location)), throwException); } public void setThrowException(boolean throwException) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestAvroSymlinkInputFormat.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestAvroSymlinkInputFormat.java index d1c277b2201d..bd227e8ceea5 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestAvroSymlinkInputFormat.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestAvroSymlinkInputFormat.java @@ -137,9 +137,9 @@ public void testSymlinkTableWithNestedDirectory() "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'"); String tableRoot = warehouseDirectory + '/' + table; - String dataDir = warehouseDirectory + "/data_test_avro_symlink_with_nested_directory"; + String dataDir = warehouseDirectory + "/data_test_avro_symlink_with_nested_directory/nested_directory"; saveResourceOnHdfs("avro/original_data.avro", dataDir + "/original_data.avro"); - hdfsClient.saveFile(tableRoot + "/symlink.txt", format("hdfs://%s/", dataDir)); + hdfsClient.saveFile(tableRoot + "/symlink.txt", format("hdfs://%s/original_data.avro", dataDir)); assertThat(onTrino().executeQuery("SELECT * FROM " + table)).containsExactlyInOrder(row("someValue", 1)); onHive().executeQuery("DROP TABLE " + table);