Skip to content

Commit

Permalink
Core: Optimize check for referenced data files in BaseRowDelta (#3071)
Browse files Browse the repository at this point in the history
This change optimizes our check for referenced data files in BaseRowDelta by pushing down the conflict detection filter. Previously, we would open manifests even though they belonged to partitions out of our interest.
  • Loading branch information
aokolnychyi authored Sep 13, 2021
1 parent 0d76982 commit 838cc65
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 2 deletions.
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilt
protected void validate(TableMetadata base) {
if (base.currentSnapshot() != null) {
if (!referencedDataFiles.isEmpty()) {
validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes);
validateDataFilesExist(
base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter);
}

// TODO: does this need to check new delete files?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin

@SuppressWarnings("CollectionUndefinedEquality")
protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId,
CharSequenceSet requiredDataFiles, boolean skipDeletes) {
CharSequenceSet requiredDataFiles, boolean skipDeletes,
Expression conflictDetectionFilter) {
// if there is no current table state, no files have been removed
if (base.currentSnapshot() == null) {
return;
Expand All @@ -339,6 +340,10 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI
.specsById(base.specsById())
.ignoreExisting();

if (conflictDetectionFilter != null) {
matchingDeletesGroup.filterData(conflictDetectionFilter);
}

try (CloseableIterator<ManifestEntry<DataFile>> deletes = matchingDeletesGroup.entries().iterator()) {
if (deletes.hasNext()) {
throw new ValidationException("Cannot commit, missing data files: %s",
Expand Down
121 changes: 121 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -629,4 +630,124 @@ public void testFastAppendDoesNotRemoveStaleDeleteFiles() {
files(FILE_A_DELETES),
statuses(Status.ADDED));
}

@Test
public void testValidateDataFilesExistWithConflictDetectionFilter() {
// change the spec to be partitioned by data
table.updateSpec()
.removeField(Expressions.bucket("data", 16))
.addField(Expressions.ref("data"))
.commit();

// add a data file to partition A
DataFile dataFile1 = DataFiles.builder(table.spec())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=a")
.withRecordCount(1)
.build();

table.newAppend()
.appendFile(dataFile1)
.commit();

// add a data file to partition B
DataFile dataFile2 = DataFiles.builder(table.spec())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=b")
.withRecordCount(1)
.build();

table.newAppend()
.appendFile(dataFile2)
.commit();

// use this snapshot as the starting snapshot in rowDelta
Snapshot baseSnapshot = table.currentSnapshot();

// add a delete file for partition A
DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath("/path/to/data-a-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=a")
.withRecordCount(1)
.build();

Expression conflictDetectionFilter = Expressions.equal("data", "a");
RowDelta rowDelta = table.newRowDelta()
.addDeletes(deleteFile)
.validateDataFilesExist(ImmutableList.of(dataFile1.path()))
.validateDeletedFiles()
.validateFromSnapshot(baseSnapshot.snapshotId())
.validateNoConflictingAppends(conflictDetectionFilter);

// concurrently delete the file for partition B
table.newDelete()
.deleteFile(dataFile2)
.commit();

// commit the delta for partition A
rowDelta.commit();

Assert.assertEquals("Table should have one new delete manifest",
1, table.currentSnapshot().deleteManifests().size());
ManifestFile deletes = table.currentSnapshot().deleteManifests().get(0);
validateDeleteManifest(deletes,
seqs(4),
ids(table.currentSnapshot().snapshotId()),
files(deleteFile),
statuses(Status.ADDED));
}

@Test
public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() {
// change the spec to be partitioned by data
table.updateSpec()
.removeField(Expressions.bucket("data", 16))
.addField(Expressions.ref("data"))
.commit();

// add a data file to partition A
DataFile dataFile1 = DataFiles.builder(table.spec())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=a")
.withRecordCount(1)
.build();

table.newAppend()
.appendFile(dataFile1)
.commit();

// use this snapshot as the starting snapshot in rowDelta
Snapshot baseSnapshot = table.currentSnapshot();

// add a delete file for partition A
DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath("/path/to/data-a-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data=a")
.withRecordCount(1)
.build();

Expression conflictDetectionFilter = Expressions.equal("data", "a");
RowDelta rowDelta = table.newRowDelta()
.addDeletes(deleteFile)
.validateDataFilesExist(ImmutableList.of(dataFile1.path()))
.validateDeletedFiles()
.validateFromSnapshot(baseSnapshot.snapshotId())
.validateNoConflictingAppends(conflictDetectionFilter);

// concurrently delete the file for partition A
table.newDelete()
.deleteFile(dataFile1)
.commit();

AssertHelpers.assertThrows("Should fail to add deletes because data file is missing",
ValidationException.class, "Cannot commit, missing data files",
rowDelta::commit);
}
}

0 comments on commit 838cc65

Please sign in to comment.