From 838cc652273c1444155bec2e1d6029cfbdbf3ea3 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 13 Sep 2021 11:25:49 -1000 Subject: [PATCH] Core: Optimize check for referenced data files in BaseRowDelta (#3071) This change optimizes our check for referenced data files in BaseRowDelta by pushing down the conflict detection filter. Previously, we would open manifests even though they belonged to partitions out of our interest. --- .../java/org/apache/iceberg/BaseRowDelta.java | 3 +- .../iceberg/MergingSnapshotProducer.java | 7 +- .../java/org/apache/iceberg/TestRowDelta.java | 121 ++++++++++++++++++ 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 8a1371311b30..4d80b01a6324 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -91,7 +91,8 @@ public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilt protected void validate(TableMetadata base) { if (base.currentSnapshot() != null) { if (!referencedDataFiles.isEmpty()) { - validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes); + validateDataFilesExist( + base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter); } // TODO: does this need to check new delete files? diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index f26412b27bbd..f907314dad69 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -318,7 +318,8 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin @SuppressWarnings("CollectionUndefinedEquality") protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, - CharSequenceSet requiredDataFiles, boolean skipDeletes) { + CharSequenceSet requiredDataFiles, boolean skipDeletes, + Expression conflictDetectionFilter) { // if there is no current table state, no files have been removed if (base.currentSnapshot() == null) { return; @@ -339,6 +340,10 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI .specsById(base.specsById()) .ignoreExisting(); + if (conflictDetectionFilter != null) { + matchingDeletesGroup.filterData(conflictDetectionFilter); + } + try (CloseableIterator> deletes = matchingDeletesGroup.entries().iterator()) { if (deletes.hasNext()) { throw new ValidationException("Cannot commit, missing data files: %s", diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 6a5c43cd25ee..c1dae7da46ee 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -21,6 +21,7 @@ import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -629,4 +630,124 @@ public void testFastAppendDoesNotRemoveStaleDeleteFiles() { files(FILE_A_DELETES), statuses(Status.ADDED)); } + + @Test + public void testValidateDataFilesExistWithConflictDetectionFilter() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // add a data file to partition B + DataFile dataFile2 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile2) + .commit(); + + // use this snapshot as the starting snapshot in rowDelta + Snapshot baseSnapshot = table.currentSnapshot(); + + // add a delete file for partition A + DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile) + .validateDataFilesExist(ImmutableList.of(dataFile1.path())) + .validateDeletedFiles() + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter); + + // concurrently delete the file for partition B + table.newDelete() + .deleteFile(dataFile2) + .commit(); + + // commit the delta for partition A + rowDelta.commit(); + + Assert.assertEquals("Table should have one new delete manifest", + 1, table.currentSnapshot().deleteManifests().size()); + ManifestFile deletes = table.currentSnapshot().deleteManifests().get(0); + validateDeleteManifest(deletes, + seqs(4), + ids(table.currentSnapshot().snapshotId()), + files(deleteFile), + statuses(Status.ADDED)); + } + + @Test + public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // use this snapshot as the starting snapshot in rowDelta + Snapshot baseSnapshot = table.currentSnapshot(); + + // add a delete file for partition A + DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile) + .validateDataFilesExist(ImmutableList.of(dataFile1.path())) + .validateDeletedFiles() + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter); + + // concurrently delete the file for partition A + table.newDelete() + .deleteFile(dataFile1) + .commit(); + + AssertHelpers.assertThrows("Should fail to add deletes because data file is missing", + ValidationException.class, "Cannot commit, missing data files", + rowDelta::commit); + } }