Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test: Use ListObjectsV2Request with startAfter in Delta Lake #18787

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.groupingBy;

final class SwitchingFileSystem
public final class SwitchingFileSystem
implements TrinoFileSystem
{
private final Optional<ConnectorSession> session;
Expand Down Expand Up @@ -130,7 +130,7 @@ public void renameDirectory(Location source, Location target)
fileSystem(source).renameDirectory(source, target);
}

private TrinoFileSystem fileSystem(Location location)
public TrinoFileSystem fileSystem(Location location)
{
return createFileSystem(determineFactory(location));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import jakarta.annotation.Nullable;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
Expand All @@ -44,7 +45,7 @@
import static com.google.common.collect.Multimaps.toMultimap;
import static java.util.Objects.requireNonNull;

final class S3FileSystem
public final class S3FileSystem
implements TrinoFileSystem
{
private final S3Client client;
Expand Down Expand Up @@ -164,6 +165,12 @@ public void renameFile(Location source, Location target)
@Override
public FileIterator listFiles(Location location)
throws IOException
{
return listFiles(location, null);
}

public FileIterator listFiles(Location location, @Nullable String startAfter)
throws IOException
{
S3Location s3Location = new S3Location(location);

Expand All @@ -175,6 +182,7 @@ public FileIterator listFiles(Location location)
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(s3Location.bucket())
.prefix(key)
.startAfter(startAfter)
.build();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static io.trino.filesystem.tracing.Tracing.withTracing;
import static java.util.Objects.requireNonNull;

final class TracingFileSystem
public final class TracingFileSystem
implements TrinoFileSystem
{
private final Tracer tracer;
Expand Down Expand Up @@ -137,4 +137,10 @@ public void renameDirectory(Location source, Location target)
.startSpan();
withTracing(span, () -> delegate.renameDirectory(source, target));
}

@Deprecated // Marking as deprecated to discourage using this method
public TrinoFileSystem getDelegate()
{
return delegate;
}
}
5 changes: 5 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
<artifactId>trino-filesystem-manager</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import dev.failsafe.RetryPolicy;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.log.Logger;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.manager.SwitchingFileSystem;
import io.trino.filesystem.s3.S3FileSystem;
import io.trino.filesystem.tracing.TracingFileSystem;
import io.trino.plugin.base.util.JsonUtils;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
Expand Down Expand Up @@ -273,6 +278,28 @@ public static long getMandatoryCurrentVersion(TrinoFileSystem fileSystem, String
long version = readLastCheckpoint(fileSystem, tableLocation).map(LastCheckpoint::getVersion).orElse(0L);

String transactionLogDir = getTransactionLogDir(tableLocation);

if (fileSystem instanceof TracingFileSystem tracingFileSystem &&
tracingFileSystem.getDelegate() instanceof SwitchingFileSystem switchingFileSystem &&
switchingFileSystem.fileSystem(Location.of(transactionLogDir)) instanceof S3FileSystem s3FileSystem) {
String startAfter = getTransactionLogJsonEntryPath(transactionLogDir, version).path();
FileIterator files = s3FileSystem.listFiles(Location.of(transactionLogDir), startAfter);

while (files.hasNext()) {
FileEntry file = files.next();
if (!file.location().fileName().endsWith(".json")) {
// _delta_log directory may contain unrelated files, e.g. crc file for json
continue;
}
Location entryPath = getTransactionLogJsonEntryPath(transactionLogDir, version + 1);
if (!entryPath.equals(file.location())) {
return version;
}
version++;
}
return version;
}

while (true) {
Location entryPath = getTransactionLogJsonEntryPath(transactionLogDir, version + 1);
if (!fileSystem.newInputFile(entryPath).exists()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
connector.name=delta_lake
hive.metastore=glue
hive.metastore.glue.region=${ENV:AWS_REGION}
# We need to give access to bucket owner (the AWS account integrated with Databricks), otherwise files won't be readable from Databricks
hive.s3.upload-acl-type=BUCKET_OWNER_FULL_CONTROL
delta.enable-non-concurrent-writes=true
delta.hive-catalog-name=hive
fs.native-s3.enabled=true