Skip to content

Commit

Permalink
Use ListObjectsV2Request with startAfter in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Aug 24, 2023
1 parent 1bc7e85 commit 9a8da99
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.groupingBy;

final class SwitchingFileSystem
public final class SwitchingFileSystem
implements TrinoFileSystem
{
private final Optional<ConnectorSession> session;
Expand Down Expand Up @@ -130,7 +130,7 @@ public void renameDirectory(Location source, Location target)
fileSystem(source).renameDirectory(source, target);
}

private TrinoFileSystem fileSystem(Location location)
public TrinoFileSystem fileSystem(Location location)
{
return createFileSystem(determineFactory(location));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import jakarta.annotation.Nullable;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
Expand All @@ -44,7 +45,7 @@
import static com.google.common.collect.Multimaps.toMultimap;
import static java.util.Objects.requireNonNull;

final class S3FileSystem
public final class S3FileSystem
implements TrinoFileSystem
{
private final S3Client client;
Expand Down Expand Up @@ -164,6 +165,12 @@ public void renameFile(Location source, Location target)
@Override
public FileIterator listFiles(Location location)
throws IOException
{
return listFiles(location, null);
}

public FileIterator listFiles(Location location, @Nullable String startAfter)
throws IOException
{
S3Location s3Location = new S3Location(location);

Expand All @@ -175,6 +182,7 @@ public FileIterator listFiles(Location location)
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(s3Location.bucket())
.prefix(key)
.startAfter(startAfter)
.build();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static io.trino.filesystem.tracing.Tracing.withTracing;
import static java.util.Objects.requireNonNull;

final class TracingFileSystem
public final class TracingFileSystem
implements TrinoFileSystem
{
private final Tracer tracer;
Expand Down Expand Up @@ -137,4 +137,10 @@ public void renameDirectory(Location source, Location target)
.startSpan();
withTracing(span, () -> delegate.renameDirectory(source, target));
}

@Deprecated // Marking as deprecated to discourage using this method
public TrinoFileSystem getDelegate()
{
return delegate;
}
}
5 changes: 5 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
<artifactId>trino-filesystem-manager</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import dev.failsafe.RetryPolicy;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.log.Logger;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.manager.SwitchingFileSystem;
import io.trino.filesystem.s3.S3FileSystem;
import io.trino.filesystem.tracing.TracingFileSystem;
import io.trino.plugin.base.util.JsonUtils;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
Expand Down Expand Up @@ -273,6 +278,28 @@ public static long getMandatoryCurrentVersion(TrinoFileSystem fileSystem, String
long version = readLastCheckpoint(fileSystem, tableLocation).map(LastCheckpoint::getVersion).orElse(0L);

String transactionLogDir = getTransactionLogDir(tableLocation);

if (fileSystem instanceof TracingFileSystem tracingFileSystem &&
tracingFileSystem.getDelegate() instanceof SwitchingFileSystem switchingFileSystem &&
switchingFileSystem.fileSystem(Location.of(transactionLogDir)) instanceof S3FileSystem s3FileSystem) {
String startAfter = getTransactionLogJsonEntryPath(transactionLogDir, version).path();
FileIterator files = s3FileSystem.listFiles(Location.of(transactionLogDir), startAfter);

while (files.hasNext()) {
FileEntry file = files.next();
if (!file.location().fileName().endsWith(".json")) {
// _delta_log directory may contain unrelated files, e.g. crc file for json
continue;
}
Location entryPath = getTransactionLogJsonEntryPath(transactionLogDir, version + 1);
if (!entryPath.equals(file.location())) {
return version;
}
version++;
}
return version;
}

while (true) {
Location entryPath = getTransactionLogJsonEntryPath(transactionLogDir, version + 1);
if (!fileSystem.newInputFile(entryPath).exists()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
connector.name=delta_lake
hive.metastore=glue
hive.metastore.glue.region=${ENV:AWS_REGION}
# We need to give access to bucket owner (the AWS account integrated with Databricks), otherwise files won't be readable from Databricks
hive.s3.upload-acl-type=BUCKET_OWNER_FULL_CONTROL
delta.enable-non-concurrent-writes=true
delta.hive-catalog-name=hive
fs.native-s3.enabled=true

0 comments on commit 9a8da99

Please sign in to comment.