From b1995367eea100b42f3987425ff37886939c65dd Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 16 Aug 2021 17:17:18 -0700 Subject: [PATCH] Core: Support committing delete files with multiple specs --- .../apache/iceberg/BaseOverwriteFiles.java | 2 +- .../apache/iceberg/BaseReplacePartitions.java | 2 +- .../iceberg/MergingSnapshotProducer.java | 101 ++++--- .../org/apache/iceberg/TableTestBase.java | 21 ++ .../java/org/apache/iceberg/TestRowDelta.java | 253 ++++++++++++++++++ 5 files changed, 333 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index e1fbb0f942c5..609e55fbbe75 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -103,7 +103,7 @@ public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetecti @Override protected void validate(TableMetadata base) { if (validateAddedFilesMatchOverwriteFilter) { - PartitionSpec spec = writeSpec(); + PartitionSpec spec = dataSpec(); Expression rowFilter = rowFilter(); Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter); diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 7f57195edc25..826bc2ae94ec 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -55,7 +55,7 @@ public ReplacePartitions validateAppendOnly() { @Override public List apply(TableMetadata base) { - if (writeSpec().fields().size() <= 0) { + if (dataSpec().fields().size() <= 0) { // replace all data in an unpartitioned table deleteByRowFilter(Expressions.alwaysTrue()); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index f907314dad69..667c9fae1ca4 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; import org.apache.iceberg.events.CreateSnapshotEvent; @@ -39,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; @@ -78,25 +80,27 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final List newFiles = Lists.newArrayList(); - private final List newDeleteFiles = Lists.newArrayList(); + private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder(); private Expression deleteExpression = Expressions.alwaysFalse(); - private PartitionSpec spec; + private PartitionSpec dataSpec; // cache new manifests after writing private ManifestFile cachedNewManifest = null; private boolean hasNewFiles = false; - private ManifestFile cachedNewDeleteManifest = null; + + // cache new manifests for delete files + private final List cachedNewDeleteManifests = Lists.newLinkedList(); private boolean hasNewDeleteFiles = false; MergingSnapshotProducer(String tableName, TableOperations ops) { super(ops); this.tableName = tableName; this.ops = ops; - this.spec = null; + this.dataSpec = null; long targetSizeBytes = ops.current() .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); int minCountToMerge = ops.current() @@ -117,11 +121,10 @@ public ThisT set(String property, String value) { return self(); } - protected PartitionSpec writeSpec() { - Preconditions.checkState(spec != null, - "Cannot determine partition spec: no data or delete files have been added"); + protected PartitionSpec dataSpec() { + Preconditions.checkState(dataSpec != null, "Cannot determine partition spec: no data files have been added"); // the spec is set when the write is started - return spec; + return dataSpec; } protected Expression rowFilter() { @@ -190,8 +193,9 @@ protected void delete(CharSequence path) { * Add a data file to the new snapshot. */ protected void add(DataFile file) { - setWriteSpec(file); - addedFilesSummary.addedFile(writeSpec(), file); + Preconditions.checkNotNull(file, "Invalid data file: null"); + setDataSpec(file); + addedFilesSummary.addedFile(dataSpec(), file); hasNewFiles = true; newFiles.add(file); } @@ -200,21 +204,21 @@ protected void add(DataFile file) { * Add a delete file to the new snapshot. */ protected void add(DeleteFile file) { - setWriteSpec(file); - addedFilesSummary.addedFile(writeSpec(), file); + Preconditions.checkNotNull(file, "Invalid delete file: null"); + PartitionSpec fileSpec = ops.current().spec(file.specId()); + List deleteFiles = newDeleteFilesBySpec.computeIfAbsent(file.specId(), specId -> Lists.newArrayList()); + deleteFiles.add(file); + addedFilesSummary.addedFile(fileSpec, file); hasNewDeleteFiles = true; - newDeleteFiles.add(file); } - private void setWriteSpec(ContentFile file) { - Preconditions.checkNotNull(file, "Invalid content file: null"); - PartitionSpec writeSpec = ops.current().spec(file.specId()); - Preconditions.checkNotNull(writeSpec, - "Cannot find partition spec for file: %s", file.path()); - if (spec == null) { - spec = writeSpec; - } else if (spec.specId() != file.specId()) { - throw new ValidationException("Invalid file, expected spec id: %d", spec.specId()); + private void setDataSpec(DataFile file) { + PartitionSpec fileSpec = ops.current().spec(file.specId()); + Preconditions.checkNotNull(fileSpec, "Cannot find partition spec for data file: %s", file.path()); + if (dataSpec == null) { + dataSpec = fileSpec; + } else if (dataSpec.specId() != file.specId()) { + throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId()); } } @@ -463,9 +467,13 @@ private void cleanUncommittedAppends(Set committed) { this.cachedNewManifest = null; } - if (cachedNewDeleteManifest != null && !committed.contains(cachedNewDeleteManifest)) { - deleteFile(cachedNewDeleteManifest.path()); - this.cachedNewDeleteManifest = null; + ListIterator deleteManifestsIterator = cachedNewDeleteManifests.listIterator(); + while (deleteManifestsIterator.hasNext()) { + ManifestFile deleteManifest = deleteManifestsIterator.next(); + if (!committed.contains(deleteManifest)) { + deleteFile(deleteManifest.path()); + deleteManifestsIterator.remove(); + } } // rewritten manifests are always owned by the table @@ -518,7 +526,7 @@ private ManifestFile newFilesAsManifest() { if (cachedNewManifest == null) { try { - ManifestWriter writer = newManifestWriter(writeSpec()); + ManifestWriter writer = newManifestWriter(dataSpec()); try { writer.addAll(newFiles); } finally { @@ -536,36 +544,41 @@ private ManifestFile newFilesAsManifest() { } private Iterable prepareDeleteManifests() { - if (newDeleteFiles.isEmpty()) { + if (newDeleteFilesBySpec.isEmpty()) { return ImmutableList.of(); } - return ImmutableList.of(newDeleteFilesAsManifest()); + return newDeleteFilesAsManifests(); } - private ManifestFile newDeleteFilesAsManifest() { - if (hasNewDeleteFiles && cachedNewDeleteManifest != null) { - deleteFile(cachedNewDeleteManifest.path()); - cachedNewDeleteManifest = null; + private List newDeleteFilesAsManifests() { + if (hasNewDeleteFiles && cachedNewDeleteManifests.size() > 0) { + for (ManifestFile cachedNewDeleteManifest : cachedNewDeleteManifests) { + deleteFile(cachedNewDeleteManifest.path()); + } + cachedNewDeleteManifests.clear(); } - if (cachedNewDeleteManifest == null) { - try { - ManifestWriter writer = newDeleteManifestWriter(writeSpec()); + if (cachedNewDeleteManifests.isEmpty()) { + newDeleteFilesBySpec.forEach((specId, deleteFiles) -> { + PartitionSpec spec = ops.current().spec(specId); try { - writer.addAll(newDeleteFiles); - } finally { - writer.close(); + ManifestWriter writer = newDeleteManifestWriter(spec); + try { + writer.addAll(deleteFiles); + } finally { + writer.close(); + } + cachedNewDeleteManifests.add(writer.toManifestFile()); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest writer"); } + }); - this.cachedNewDeleteManifest = writer.toManifestFile(); - this.hasNewDeleteFiles = false; - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close manifest writer"); - } + this.hasNewDeleteFiles = false; } - return cachedNewDeleteManifest; + return cachedNewDeleteManifests; } private class DataFileFilterManager extends ManifestFilterManager { diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 16a6bfd7aee5..f271f8ecd38d 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.stream.LongStream; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -433,6 +434,26 @@ void validateDeleteManifest(ManifestFile manifest, Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext()); } + protected DataFile newDataFile(String partitionPath) { + return DataFiles.builder(table.spec()) + .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet") + .withFileSizeInBytes(10) + .withPartitionPath(partitionPath) + .withRecordCount(1) + .build(); + } + + protected DeleteFile newDeleteFile(int specId, String partitionPath) { + PartitionSpec spec = table.specs().get(specId); + return FileMetadata.deleteFileBuilder(spec) + .ofPositionDeletes() + .withPath("/path/to/delete" + UUID.randomUUID() + ".parquet") + .withFileSizeInBytes(10) + .withPartitionPath(partitionPath) + .withRecordCount(1) + .build(); + } + static void validateManifestEntries(ManifestFile manifest, Iterator ids, Iterator expectedFiles, diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index c1dae7da46ee..0675b45dc342 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -19,6 +19,8 @@ package org.apache.iceberg; +import java.util.Map; +import java.util.Set; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; @@ -28,6 +30,15 @@ import org.junit.Assert; import org.junit.Test; +import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.ADDED_POS_DELETES_PROP; +import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP; +import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_PREFIX; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP; + public class TestRowDelta extends V2TableTestBase { @Test public void testAddDeleteFile() { @@ -750,4 +761,246 @@ public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() { ValidationException.class, "Cannot commit, missing data files", rowDelta::commit); } + + @Test + public void testAddDeleteFilesMultipleSpecs() { + // enable partition summaries + table.updateProperties() + .set(TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, "10") + .commit(); + + // append a partitioned data file + DataFile firstSnapshotDataFile = newDataFile("data_bucket=0"); + table.newAppend() + .appendFile(firstSnapshotDataFile) + .commit(); + + // remove the only partition field to make the spec unpartitioned + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .commit(); + + Assert.assertTrue("Spec must be unpartitioned", table.spec().isUnpartitioned()); + + // append an unpartitioned data file + DataFile secondSnapshotDataFile = newDataFile(""); + table.newAppend() + .appendFile(secondSnapshotDataFile) + .commit(); + + // evolve the spec and add a new partition field + table.updateSpec() + .addField("data") + .commit(); + + // append a data file with the new spec + DataFile thirdSnapshotDataFile = newDataFile("data=abc"); + table.newAppend() + .appendFile(thirdSnapshotDataFile) + .commit(); + + Assert.assertEquals("Should have 3 specs", 3, table.specs().size()); + + // commit a row delta with 1 data file and 3 delete files where delete files have different specs + DataFile dataFile = newDataFile("data=xyz"); + DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); + DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + DeleteFile thirdDeleteFile = newDeleteFile(thirdSnapshotDataFile.specId(), "data=abc"); + + table.newRowDelta() + .addRows(dataFile) + .addDeletes(firstDeleteFile) + .addDeletes(secondDeleteFile) + .addDeletes(thirdDeleteFile) + .commit(); + + Snapshot snapshot = table.currentSnapshot(); + Assert.assertEquals("Commit should produce sequence number 4", 4, snapshot.sequenceNumber()); + Assert.assertEquals("Last sequence number should be 4", 4, table.ops().current().lastSequenceNumber()); + Assert.assertEquals("Delta commit should be 'overwrite'", DataOperations.OVERWRITE, snapshot.operation()); + + Map summary = snapshot.summary(); + + Assert.assertEquals("Should change 4 partitions", "4", summary.get(CHANGED_PARTITION_COUNT_PROP)); + Assert.assertEquals("Should add 1 data file", "1", summary.get(ADDED_FILES_PROP)); + Assert.assertEquals("Should have 4 data files", "4", summary.get(TOTAL_DATA_FILES_PROP)); + Assert.assertEquals("Should add 3 delete files", "3", summary.get(ADDED_DELETE_FILES_PROP)); + Assert.assertEquals("Should have 3 delete files", "3", summary.get(TOTAL_DELETE_FILES_PROP)); + Assert.assertEquals("Should add 3 position deletes", "3", summary.get(ADDED_POS_DELETES_PROP)); + Assert.assertEquals("Should have 3 position deletes", "3", summary.get(TOTAL_POS_DELETES_PROP)); + + Assert.assertTrue("Partition metrics must be correct", + summary.get(CHANGED_PARTITION_PREFIX).contains(ADDED_DELETE_FILES_PROP + "=1")); + Assert.assertTrue("Partition metrics must be correct", + summary.get(CHANGED_PARTITION_PREFIX + "data_bucket=0").contains(ADDED_DELETE_FILES_PROP + "=1")); + Assert.assertTrue("Partition metrics must be correct", + summary.get(CHANGED_PARTITION_PREFIX + "data=abc").contains(ADDED_DELETE_FILES_PROP + "=1")); + Assert.assertTrue("Partition metrics must be correct", + summary.get(CHANGED_PARTITION_PREFIX + "data=xyz").contains(ADDED_FILES_PROP + "=1")); + + // 3 appends + 1 row delta + Assert.assertEquals("Should have 4 data manifest", 4, snapshot.dataManifests().size()); + validateManifest( + snapshot.dataManifests().get(0), + seqs(4), + ids(snapshot.snapshotId()), + files(dataFile), + statuses(Status.ADDED)); + + // each delete file goes into a separate manifest as the specs are different + Assert.assertEquals("Should produce 3 delete manifest", 3, snapshot.deleteManifests().size()); + + ManifestFile firstDeleteManifest = snapshot.deleteManifests().get(2); + Assert.assertEquals("Spec must match", firstSnapshotDataFile.specId(), firstDeleteManifest.partitionSpecId()); + validateDeleteManifest( + firstDeleteManifest, + seqs(4), + ids(snapshot.snapshotId()), + files(firstDeleteFile), + statuses(Status.ADDED)); + + ManifestFile secondDeleteManifest = snapshot.deleteManifests().get(1); + Assert.assertEquals("Spec must match", secondSnapshotDataFile.specId(), secondDeleteManifest.partitionSpecId()); + validateDeleteManifest( + secondDeleteManifest, + seqs(4), + ids(snapshot.snapshotId()), + files(secondDeleteFile), + statuses(Status.ADDED)); + + ManifestFile thirdDeleteManifest = snapshot.deleteManifests().get(0); + Assert.assertEquals("Spec must match", thirdSnapshotDataFile.specId(), thirdDeleteManifest.partitionSpecId()); + validateDeleteManifest( + thirdDeleteManifest, + seqs(4), + ids(snapshot.snapshotId()), + files(thirdDeleteFile), + statuses(Status.ADDED)); + } + + @Test + public void testManifestMergingMultipleSpecs() { + // make sure we enable manifest merging + table.updateProperties() + .set(TableProperties.MANIFEST_MERGE_ENABLED, "true") + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2") + .commit(); + + // append a partitioned data file + DataFile firstSnapshotDataFile = newDataFile("data_bucket=0"); + table.newAppend() + .appendFile(firstSnapshotDataFile) + .commit(); + + // remove the only partition field to make the spec unpartitioned + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .commit(); + + Assert.assertTrue("Spec must be unpartitioned", table.spec().isUnpartitioned()); + + // append an unpartitioned data file + DataFile secondSnapshotDataFile = newDataFile(""); + table.newAppend() + .appendFile(secondSnapshotDataFile) + .commit(); + + // commit two delete files to two specs in a single operation + DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); + DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + + table.newRowDelta() + .addDeletes(firstDeleteFile) + .addDeletes(secondDeleteFile) + .commit(); + + Snapshot thirdSnapshot = table.currentSnapshot(); + + // 2 appends and 1 row delta where delete files belong to different specs + Assert.assertEquals("Should have 2 data manifest", 2, thirdSnapshot.dataManifests().size()); + Assert.assertEquals("Should have 2 delete manifest", 2, thirdSnapshot.deleteManifests().size()); + + // commit two more delete files to the same specs to trigger merging + DeleteFile thirdDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); + DeleteFile fourthDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + + table.newRowDelta() + .addDeletes(thirdDeleteFile) + .addDeletes(fourthDeleteFile) + .commit(); + + Snapshot fourthSnapshot = table.currentSnapshot(); + + // make sure merging respects spec boundaries + Assert.assertEquals("Should have 2 data manifest", 2, fourthSnapshot.dataManifests().size()); + Assert.assertEquals("Should have 2 delete manifest", 2, fourthSnapshot.deleteManifests().size()); + + ManifestFile firstDeleteManifest = fourthSnapshot.deleteManifests().get(1); + Assert.assertEquals("Spec must match", firstSnapshotDataFile.specId(), firstDeleteManifest.partitionSpecId()); + validateDeleteManifest( + firstDeleteManifest, + seqs(4, 3), + ids(fourthSnapshot.snapshotId(), thirdSnapshot.snapshotId()), + files(thirdDeleteFile, firstDeleteFile), + statuses(Status.ADDED, Status.EXISTING)); + + ManifestFile secondDeleteManifest = fourthSnapshot.deleteManifests().get(0); + Assert.assertEquals("Spec must match", secondSnapshotDataFile.specId(), secondDeleteManifest.partitionSpecId()); + validateDeleteManifest( + secondDeleteManifest, + seqs(4, 3), + ids(fourthSnapshot.snapshotId(), thirdSnapshot.snapshotId()), + files(fourthDeleteFile, secondDeleteFile), + statuses(Status.ADDED, Status.EXISTING)); + } + + @Test + public void testAbortMultipleSpecs() { + // append a partitioned data file + DataFile firstSnapshotDataFile = newDataFile("data_bucket=0"); + table.newAppend() + .appendFile(firstSnapshotDataFile) + .commit(); + + // remove the only partition field to make the spec unpartitioned + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .commit(); + + Assert.assertTrue("Spec must be unpartitioned", table.spec().isUnpartitioned()); + + // append an unpartitioned data file + DataFile secondSnapshotDataFile = newDataFile(""); + table.newAppend() + .appendFile(secondSnapshotDataFile) + .commit(); + + // prepare two delete files that belong to different specs + DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); + DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + + // capture all deletes + Set deletedFiles = Sets.newHashSet(); + + RowDelta rowDelta = table.newRowDelta() + .addDeletes(firstDeleteFile) + .addDeletes(secondDeleteFile) + .deleteWith(deletedFiles::add) + .validateDeletedFiles() + .validateDataFilesExist(ImmutableList.of(firstSnapshotDataFile.path())); + + rowDelta.apply(); + + // perform a conflicting concurrent operation + table.newDelete() + .deleteFile(firstSnapshotDataFile) + .commit(); + + AssertHelpers.assertThrows("Should fail to commit row delta", + ValidationException.class, "Cannot commit, missing data files", + rowDelta::commit); + + // we should clean up 1 manifest list and 2 delete manifests + Assert.assertEquals("Should delete 3 files", 3, deletedFiles.size()); + } }