Skip to content

Commit

Permalink
Add vacuum and table changes test case for shallow cloned tables
Browse files Browse the repository at this point in the history
  • Loading branch information
vinay-kl committed Mar 21, 2024
1 parent 4d0d903 commit 12102c7
Show file tree
Hide file tree
Showing 8 changed files with 446 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,13 @@ private List<DeltaLakeSplit> splitsForFile(

public static Location buildSplitPath(Location tableLocation, AddFileEntry addAction)
{
// paths are relative to the table location and are RFC 2396 URIs
// paths are relative to the table location or absolute in case of shallow cloned table and are RFC 2396 URIs
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
URI uri = URI.create(addAction.getPath());

if (uri.isAbsolute()) {
return Location.of(uri.getScheme() + ":" + uri.getSchemeSpecificPart());
}
return tableLocation.appendPath(uri.getPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private Stream<ConnectorSplit> prepareSplits(long currentVersion, long tableRead
if (!containsRemoveEntry) {
for (DeltaLakeTransactionLogEntry entry : entries) {
if (entry.getAdd() != null && entry.getAdd().isDataChange()) {
// paths can be absolute as well in case of shallow-cloned tables
AddFileEntry addEntry = entry.getAdd();
splits.add(mapToDeltaLakeTableChangesSplit(
commitInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ private void doVacuum(
alwaysFalse())) {
retainedPaths = Stream.concat(
activeAddEntries
// paths can be absolute as well in case of shallow-cloned tables, but they shouldn't and aren't being deleted as part of vacuum
.map(AddFileEntry::getPath),
transactionLogAccess.getJsonEntries(
fileSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ else if (stats.isPresent()) {
}

/**
* paths are relative to the table location or absolute in case of shallow cloned table and are RFC 2396 URIs
* @see <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-filel"> Delta Lake protocol</a>
*/
@JsonProperty
public String getPath()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testInitialSplits()
throws ExecutionException, InterruptedException
{
long fileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(1000)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000));
Expand All @@ -116,20 +116,55 @@ public void testInitialSplits()
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight));
makeSplit(FULL_PATH, 0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 15_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertThat(splits).isEqualTo(expected);
}

@Test
public void testAbsolutePathSplits()
throws Exception
{
testAbsolutePathSplits("file://path/to/file", "file://path/to/file");
testAbsolutePathSplits("abfs://[email protected]/path/to/file", "abfs://[email protected]/path/to/file");
testAbsolutePathSplits("hdfs://path/to/file", "hdfs://path/to/file");
testAbsolutePathSplits("s3://my-s3-bucket/path/to//file", "s3://my-s3-bucket/path/to//file");
testAbsolutePathSplits("s3://my-s3-bucket/path/to//file/", "s3://my-s3-bucket/path/to//file/");
testAbsolutePathSplits("gs://my-gcp-bucket/path/to/file", "gs://my-gcp-bucket/path/to/file");
testAbsolutePathSplits("abfs://[email protected]/+ab+/a%25/a%2525/path/to/file", "abfs://[email protected]/+ab+/a%/a%25/path/to/file");
}

private void testAbsolutePathSplits(String absoluteRawEncodedFilePath, String absoluteDecodedParsedFilePath)
throws Exception
{
long fileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(absoluteRawEncodedFilePath, fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(1000)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000));
double minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();

DeltaLakeSplitManager splitManager = setupSplitManager(addFileEntries, deltaLakeConfig);
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(absoluteDecodedParsedFilePath, 0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(absoluteDecodedParsedFilePath, 5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(absoluteDecodedParsedFilePath, 10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(absoluteDecodedParsedFilePath, 15_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertThat(splits).isEqualTo(expected);
assertThat(absoluteDecodedParsedFilePath).isEqualTo(splits.get(0).getPath());
}

@Test
public void testNonInitialSplits()
throws ExecutionException, InterruptedException
{
long fileSize = 50_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(fileSize));
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, fileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(5)
.setMaxInitialSplitSize(DataSize.ofBytes(5_000))
Expand All @@ -140,13 +175,13 @@ public void testNonInitialSplits()
List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);

List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(15_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(20_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(25_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(45_000, 5_000, fileSize, minimumAssignedSplitWeight));
makeSplit(FULL_PATH, 0, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 5_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 10_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 15_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 20_000, 5_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 25_000, 20_000, fileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 45_000, 5_000, fileSize, minimumAssignedSplitWeight));

assertThat(splits).isEqualTo(expected);
}
Expand All @@ -157,7 +192,7 @@ public void testSplitsFromMultipleFiles()
{
long firstFileSize = 1_000;
long secondFileSize = 20_000;
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(firstFileSize), addFileEntryOfSize(secondFileSize));
List<AddFileEntry> addFileEntries = ImmutableList.of(addFileEntryOfSize(FILE_PATH, firstFileSize), addFileEntryOfSize(FILE_PATH, secondFileSize));
DeltaLakeConfig deltaLakeConfig = new DeltaLakeConfig()
.setMaxInitialSplits(3)
.setMaxInitialSplitSize(DataSize.ofBytes(2_000))
Expand All @@ -168,11 +203,11 @@ public void testSplitsFromMultipleFiles()

List<DeltaLakeSplit> splits = getSplits(splitManager, deltaLakeConfig);
List<DeltaLakeSplit> expected = ImmutableList.of(
makeSplit(0, 1_000, firstFileSize, minimumAssignedSplitWeight),
makeSplit(0, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(2_000, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(4_000, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(14_000, 6_000, secondFileSize, minimumAssignedSplitWeight));
makeSplit(FULL_PATH, 0, 1_000, firstFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 0, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 2_000, 2_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 4_000, 10_000, secondFileSize, minimumAssignedSplitWeight),
makeSplit(FULL_PATH, 14_000, 6_000, secondFileSize, minimumAssignedSplitWeight));
assertThat(splits).isEqualTo(expected);
}

Expand Down Expand Up @@ -244,15 +279,15 @@ public Stream<AddFileEntry> getActiveFiles(
new DefaultCachingHostAddressProvider());
}

private AddFileEntry addFileEntryOfSize(long fileSize)
private AddFileEntry addFileEntryOfSize(String path, long fileSize)
{
return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty());
return new AddFileEntry(path, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty());
}

private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
private DeltaLakeSplit makeSplit(String path, long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
{
SplitWeight splitWeight = SplitWeight.fromProportion(clamp((double) fileSize / splitSize, minimumAssignedSplitWeight, 1.0));
return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of());
return new DeltaLakeSplit(path, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of());
}

private List<DeltaLakeSplit> getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig)
Expand Down
Loading

0 comments on commit 12102c7

Please sign in to comment.