From 8320a0ed23cdc4a756c4a60f756dbd1ba10e1715 Mon Sep 17 00:00:00 2001 From: Farooq Qaiser Date: Sat, 2 Mar 2024 21:52:56 -0500 Subject: [PATCH] Core: Support appending files with different specs --- .../apache/iceberg/BaseOverwriteFiles.java | 50 ++++++----- .../apache/iceberg/BaseReplacePartitions.java | 51 ++++++----- .../iceberg/MergingSnapshotProducer.java | 85 ++++++++++--------- .../org/apache/iceberg/TestMergeAppend.java | 72 ++++++++++++++++ 4 files changed, 172 insertions(+), 86 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index a994eaf44d9a..9b9891b616e8 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -113,29 +113,33 @@ public BaseOverwriteFiles toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { if (validateAddedFilesMatchOverwriteFilter) { - PartitionSpec spec = dataSpec(); - Expression rowFilter = rowFilter(); - - Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter); - Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); - - Expression strictExpr = Projections.strict(spec).project(rowFilter); - Evaluator strict = new Evaluator(spec.partitionType(), strictExpr); - - StrictMetricsEvaluator metrics = - new StrictMetricsEvaluator(base.schema(), rowFilter, isCaseSensitive()); - - for (DataFile file : addedDataFiles()) { - // the real test is that the strict or metrics test matches the file, indicating that all - // records in the file match the filter. inclusive is used to avoid testing the metrics, - // which is more complicated - ValidationException.check( - inclusive.eval(file.partition()) - && (strict.eval(file.partition()) || metrics.eval(file)), - "Cannot append file with rows that do not match filter: %s: %s", - rowFilter, - file.path()); - } + addedDataFilesBySpec() + .forEach( + (specId, addedDataFiles) -> { + PartitionSpec spec = base.spec(specId); + Expression rowFilter = rowFilter(); + + Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter); + Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr); + + Expression strictExpr = Projections.strict(spec).project(rowFilter); + Evaluator strict = new Evaluator(spec.partitionType(), strictExpr); + + StrictMetricsEvaluator metrics = + new StrictMetricsEvaluator(base.schema(), rowFilter, isCaseSensitive()); + + for (DataFile file : addedDataFiles) { + // the real test is that the strict or metrics test matches the file, indicating + // that all records in the file match the filter. inclusive is used to avoid + // testing the metrics, which is more complicated + ValidationException.check( + inclusive.eval(file.partition()) + && (strict.eval(file.partition()) || metrics.eval(file)), + "Cannot append file with rows that do not match filter: %s: %s", + rowFilter, + file.path()); + } + }); } if (validateNewDataFiles) { diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index d3a8edbc7cdd..975ff0023f2f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -87,31 +87,40 @@ public BaseReplacePartitions toBranch(String branch) { @Override public void validate(TableMetadata currentMetadata, Snapshot parent) { - if (validateConflictingData) { - if (dataSpec().isUnpartitioned()) { - validateAddedDataFiles( - currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); - } else { - validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent); - } - } - - if (validateConflictingDeletes) { - if (dataSpec().isUnpartitioned()) { - validateDeletedDataFiles( - currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); - validateNoNewDeleteFiles( - currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); - } else { - validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent); - validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent); - } - } + dataSpecs() + .forEach( + dataSpec -> { + if (validateConflictingData) { + if (dataSpec.isUnpartitioned()) { + validateAddedDataFiles( + currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); + } else { + validateAddedDataFiles( + currentMetadata, startingSnapshotId, replacedPartitions, parent); + } + } + + if (validateConflictingDeletes) { + if (dataSpec.isUnpartitioned()) { + validateDeletedDataFiles( + currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); + validateNoNewDeleteFiles( + currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent); + } else { + validateDeletedDataFiles( + currentMetadata, startingSnapshotId, replacedPartitions, parent); + validateNoNewDeleteFiles( + currentMetadata, startingSnapshotId, replacedPartitions, parent); + } + } + }); } @Override public List apply(TableMetadata base, Snapshot snapshot) { - if (dataSpec().fields().size() <= 0) { + // TODO: I don't understand this, why delete all data? what if only part of the table is + // unpartitioned? + if (dataSpecs().stream().anyMatch(spec -> spec.fields().size() <= 0)) { // replace all data in an unpartitioned table deleteByRowFilter(Expressions.alwaysTrue()); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 5d3ec6e35f0d..c6b30fcc1609 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -42,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Predicate; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; @@ -79,7 +80,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final ManifestFilterManager deleteFilterManager; // update data - private final List newDataFiles = Lists.newArrayList(); + private final Map> newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); @@ -87,10 +88,10 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder(); private Expression deleteExpression = Expressions.alwaysFalse(); - private PartitionSpec dataSpec; + private final Set dataSpecs = Sets.newHashSet(); // cache new data manifests after writing - private List cachedNewDataManifests = null; + private final List cachedNewDataManifests = Lists.newLinkedList(); private boolean hasNewDataFiles = false; // cache new manifests for delete files @@ -103,7 +104,6 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { super(ops); this.tableName = tableName; this.ops = ops; - this.dataSpec = null; long targetSizeBytes = ops.current() .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); @@ -138,19 +138,18 @@ protected boolean isCaseSensitive() { return caseSensitive; } - protected PartitionSpec dataSpec() { + protected Set dataSpecs() { Preconditions.checkState( - dataSpec != null, "Cannot determine partition spec: no data files have been added"); - // the spec is set when the write is started - return dataSpec; + !dataSpecs.isEmpty(), "Cannot determine partition specs: no data files have been added"); + return dataSpecs; } protected Expression rowFilter() { return deleteExpression; } - protected List addedDataFiles() { - return ImmutableList.copyOf(newDataFiles); + protected Map> addedDataFilesBySpec() { + return ImmutableMap.copyOf(newDataFilesBySpec); } protected void failAnyDelete() { @@ -210,7 +209,7 @@ protected boolean deletesDeleteFiles() { } protected boolean addsDataFiles() { - return !newDataFiles.isEmpty(); + return !newDataFilesBySpec.isEmpty(); } protected boolean addsDeleteFiles() { @@ -220,9 +219,11 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - setDataSpec(file); - addedFilesSummary.addedFile(dataSpec(), file); + PartitionSpec dataSpec = addDataSpec(file); + addedFilesSummary.addedFile(dataSpec, file); hasNewDataFiles = true; + List newDataFiles = + newDataFilesBySpec.computeIfAbsent(dataSpec.specId(), ignored -> Lists.newArrayList()); newDataFiles.add(file); } @@ -248,15 +249,12 @@ private void add(DeleteFileHolder fileHolder) { hasNewDeleteFiles = true; } - private void setDataSpec(DataFile file) { + private PartitionSpec addDataSpec(DataFile file) { PartitionSpec fileSpec = ops.current().spec(file.specId()); Preconditions.checkNotNull( fileSpec, "Cannot find partition spec for data file: %s", file.path()); - if (dataSpec == null) { - dataSpec = fileSpec; - } else if (dataSpec.specId() != file.specId()) { - throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId()); - } + dataSpecs.add(fileSpec); + return fileSpec; } /** Add all files in a manifest to the new snapshot. */ @@ -878,7 +876,7 @@ public Object updateEvent() { @SuppressWarnings("checkstyle:CyclomaticComplexity") private void cleanUncommittedAppends(Set committed) { - if (cachedNewDataManifests != null) { + if (!cachedNewDataManifests.isEmpty()) { boolean hasDeletes = false; for (ManifestFile manifest : cachedNewDataManifests) { if (!committed.contains(manifest)) { @@ -888,7 +886,7 @@ private void cleanUncommittedAppends(Set committed) { } if (hasDeletes) { - this.cachedNewDataManifests = null; + this.cachedNewDataManifests.clear(); } } @@ -934,7 +932,7 @@ protected void cleanUncommitted(Set committed) { private Iterable prepareNewDataManifests() { Iterable newManifests; - if (!newDataFiles.isEmpty()) { + if (!newDataFilesBySpec.isEmpty()) { List dataFileManifests = newDataFilesAsManifests(); newManifests = Iterables.concat(dataFileManifests, appendManifests, rewrittenAppendManifests); } else { @@ -947,29 +945,32 @@ private Iterable prepareNewDataManifests() { } private List newDataFilesAsManifests() { - if (hasNewDataFiles && cachedNewDataManifests != null) { + if (hasNewDataFiles && !cachedNewDataManifests.isEmpty()) { cachedNewDataManifests.forEach(file -> deleteFile(file.path())); - cachedNewDataManifests = null; + cachedNewDataManifests.clear(); } - if (cachedNewDataManifests == null) { - try { - RollingManifestWriter writer = newRollingManifestWriter(dataSpec()); - try { - if (newDataFilesDataSequenceNumber == null) { - newDataFiles.forEach(writer::add); - } else { - newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber)); - } - } finally { - writer.close(); - } - - this.cachedNewDataManifests = writer.toManifestFiles(); - this.hasNewDataFiles = false; - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close manifest writer"); - } + if (cachedNewDataManifests.isEmpty()) { + newDataFilesBySpec.forEach( + (specId, newDataFiles) -> { + PartitionSpec dataSpec = ops.current().spec(specId); + try { + RollingManifestWriter writer = newRollingManifestWriter(dataSpec); + try { + if (newDataFilesDataSequenceNumber == null) { + newDataFiles.forEach(writer::add); + } else { + newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber)); + } + } finally { + writer.close(); + } + this.cachedNewDataManifests.addAll(writer.toManifestFiles()); + this.hasNewDataFiles = false; + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest writer"); + } + }); } return cachedNewDataManifests; diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index 39c9ac4b6c21..c1e740c83eff 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -92,6 +93,77 @@ public void testEmptyTableAppend() { statuses(Status.ADDED, Status.ADDED)); } + @Test + public void testEmptyTableAppendFilesWithDifferentSpecs() { + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + TableMetadata base = readMetadata(); + Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber()); + + table.updateSpec().addField("id").commit(); + PartitionSpec new_spec = table.spec(); + + Assert.assertEquals("Table should have 2 specs", table.specs().size(), 2); + + DataFile file_original_spec = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withPartitionPath("data_bucket=0") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + + DataFile file_new_spec = + DataFiles.builder(new_spec) + .withPath("/path/to/data-b.parquet") + .withPartitionPath("data_bucket=0/id=0") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + + Snapshot committedSnapshot = + commit( + table, + table.newAppend().appendFile(file_original_spec).appendFile(file_new_spec), + branch); + + Assert.assertNotNull("Should create a snapshot", committedSnapshot); + V1Assert.assertEquals( + "Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber()); + V2Assert.assertEquals( + "Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber()); + + Assert.assertEquals( + "Should create 2 manifests for initial write, 1 manifest per spec", + 2, + committedSnapshot.allManifests(table.io()).size()); + + long snapshotId = committedSnapshot.snapshotId(); + + validateManifest( + committedSnapshot.allManifests(table.io()).stream() + .filter(m -> Objects.equals(m.partitionSpecId(), SPEC.specId())) + .findAny() + .get(), + dataSeqs(1L), + fileSeqs(1L), + ids(snapshotId), + files(file_original_spec), + statuses(Status.ADDED)); + + validateManifest( + committedSnapshot.allManifests(table.io()).stream() + .filter(m -> Objects.equals(m.partitionSpecId(), new_spec.specId())) + .findAny() + .get(), + dataSeqs(1L), + fileSeqs(1L), + ids(snapshotId), + files(file_new_spec), + statuses(Status.ADDED)); + } + @Test public void testEmptyTableAppendManifest() throws IOException { Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());