diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java index 670da1f2fea3..af04fb7338bf 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/SwitchingFileSystem.java @@ -32,7 +32,7 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.groupingBy; -final class SwitchingFileSystem +public final class SwitchingFileSystem implements TrinoFileSystem { private final Optional session; @@ -130,7 +130,7 @@ public void renameDirectory(Location source, Location target) fileSystem(source).renameDirectory(source, target); } - private TrinoFileSystem fileSystem(Location location) + public TrinoFileSystem fileSystem(Location location) { return createFileSystem(determineFactory(location)); } diff --git a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java index 904b237078f0..72a236e3ea7a 100644 --- a/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java +++ b/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java @@ -20,6 +20,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; +import jakarta.annotation.Nullable; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; @@ -44,7 +45,7 @@ import static com.google.common.collect.Multimaps.toMultimap; import static java.util.Objects.requireNonNull; -final class S3FileSystem +public final class S3FileSystem implements TrinoFileSystem { private final S3Client client; @@ -164,6 +165,12 @@ public void renameFile(Location source, Location target) @Override public FileIterator listFiles(Location location) throws IOException + { + return listFiles(location, null); + } + + public FileIterator listFiles(Location location, @Nullable String startAfter) + throws IOException { S3Location s3Location = new S3Location(location); @@ -175,6 +182,7 @@ public FileIterator listFiles(Location location) ListObjectsV2Request request = ListObjectsV2Request.builder() .bucket(s3Location.bucket()) .prefix(key) + .startAfter(startAfter) .build(); try { diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java index a1b69b42199d..bd558bdf6d82 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystem.java @@ -28,7 +28,7 @@ import static io.trino.filesystem.tracing.Tracing.withTracing; import static java.util.Objects.requireNonNull; -final class TracingFileSystem +public final class TracingFileSystem implements TrinoFileSystem { private final Tracer tracer; @@ -137,4 +137,10 @@ public void renameDirectory(Location source, Location target) .startSpan(); withTracing(span, () -> delegate.renameDirectory(source, target)); } + + @Deprecated // Marking as deprecated to discourage using this method + public TrinoFileSystem getDelegate() + { + return delegate; + } } diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 2e0030234377..dfee2339a8f2 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -134,6 +134,11 @@ trino-filesystem-manager + + io.trino + trino-filesystem-s3 + + io.trino trino-hdfs diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java index 134daf28960c..ef2be8bc8930 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java @@ -20,9 +20,14 @@ import dev.failsafe.RetryPolicy; import io.airlift.json.ObjectMapperProvider; import io.airlift.log.Logger; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.manager.SwitchingFileSystem; +import io.trino.filesystem.s3.S3FileSystem; +import io.trino.filesystem.tracing.TracingFileSystem; import io.trino.plugin.base.util.JsonUtils; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; @@ -273,6 +278,28 @@ public static long getMandatoryCurrentVersion(TrinoFileSystem fileSystem, String long version = readLastCheckpoint(fileSystem, tableLocation).map(LastCheckpoint::getVersion).orElse(0L); String transactionLogDir = getTransactionLogDir(tableLocation); + + if (fileSystem instanceof TracingFileSystem tracingFileSystem && + tracingFileSystem.getDelegate() instanceof SwitchingFileSystem switchingFileSystem && + switchingFileSystem.fileSystem(Location.of(transactionLogDir)) instanceof S3FileSystem s3FileSystem) { + String startAfter = getTransactionLogJsonEntryPath(transactionLogDir, version).path(); + FileIterator files = s3FileSystem.listFiles(Location.of(transactionLogDir), startAfter); + + while (files.hasNext()) { + FileEntry file = files.next(); + if (!file.location().fileName().endsWith(".json")) { + // _delta_log directory may contain unrelated files, e.g. crc file for json + continue; + } + Location entryPath = getTransactionLogJsonEntryPath(transactionLogDir, version + 1); + if (!entryPath.equals(file.location())) { + return version; + } + version++; + } + return version; + } + while (true) { Location entryPath = getTransactionLogJsonEntryPath(transactionLogDir, version + 1); if (!fileSystem.newInputFile(entryPath).exists()) { diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-delta-lake-databricks/delta.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-delta-lake-databricks/delta.properties index db7aeba4742f..6ead7f5b4079 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-delta-lake-databricks/delta.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-delta-lake-databricks/delta.properties @@ -1,7 +1,6 @@ connector.name=delta_lake hive.metastore=glue hive.metastore.glue.region=${ENV:AWS_REGION} -# We need to give access to bucket owner (the AWS account integrated with Databricks), otherwise files won't be readable from Databricks -hive.s3.upload-acl-type=BUCKET_OWNER_FULL_CONTROL delta.enable-non-concurrent-writes=true delta.hive-catalog-name=hive +fs.native-s3.enabled=true