Skip to content

Commit

Permalink
Core: Support appending files with different specs
Browse files Browse the repository at this point in the history
  • Loading branch information
Farooq Qaiser committed Mar 3, 2024
1 parent 08e31ce commit 8320a0e
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 86 deletions.
50 changes: 27 additions & 23 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,33 @@ public BaseOverwriteFiles toBranch(String branch) {
@Override
protected void validate(TableMetadata base, Snapshot parent) {
if (validateAddedFilesMatchOverwriteFilter) {
PartitionSpec spec = dataSpec();
Expression rowFilter = rowFilter();

Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter);
Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr);

Expression strictExpr = Projections.strict(spec).project(rowFilter);
Evaluator strict = new Evaluator(spec.partitionType(), strictExpr);

StrictMetricsEvaluator metrics =
new StrictMetricsEvaluator(base.schema(), rowFilter, isCaseSensitive());

for (DataFile file : addedDataFiles()) {
// the real test is that the strict or metrics test matches the file, indicating that all
// records in the file match the filter. inclusive is used to avoid testing the metrics,
// which is more complicated
ValidationException.check(
inclusive.eval(file.partition())
&& (strict.eval(file.partition()) || metrics.eval(file)),
"Cannot append file with rows that do not match filter: %s: %s",
rowFilter,
file.path());
}
addedDataFilesBySpec()
.forEach(
(specId, addedDataFiles) -> {
PartitionSpec spec = base.spec(specId);
Expression rowFilter = rowFilter();

Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter);
Evaluator inclusive = new Evaluator(spec.partitionType(), inclusiveExpr);

Expression strictExpr = Projections.strict(spec).project(rowFilter);
Evaluator strict = new Evaluator(spec.partitionType(), strictExpr);

StrictMetricsEvaluator metrics =
new StrictMetricsEvaluator(base.schema(), rowFilter, isCaseSensitive());

for (DataFile file : addedDataFiles) {
// the real test is that the strict or metrics test matches the file, indicating
// that all records in the file match the filter. inclusive is used to avoid
// testing the metrics, which is more complicated
ValidationException.check(
inclusive.eval(file.partition())
&& (strict.eval(file.partition()) || metrics.eval(file)),
"Cannot append file with rows that do not match filter: %s: %s",
rowFilter,
file.path());
}
});
}

if (validateNewDataFiles) {
Expand Down
51 changes: 30 additions & 21 deletions core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,31 +87,40 @@ public BaseReplacePartitions toBranch(String branch) {

@Override
public void validate(TableMetadata currentMetadata, Snapshot parent) {
if (validateConflictingData) {
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}

if (validateConflictingDeletes) {
if (dataSpec().isUnpartitioned()) {
validateDeletedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
validateNoNewDeleteFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}
dataSpecs()
.forEach(
dataSpec -> {
if (validateConflictingData) {
if (dataSpec.isUnpartitioned()) {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}

if (validateConflictingDeletes) {
if (dataSpec.isUnpartitioned()) {
validateDeletedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
validateNoNewDeleteFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateDeletedDataFiles(
currentMetadata, startingSnapshotId, replacedPartitions, parent);
validateNoNewDeleteFiles(
currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}
});
}

@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
if (dataSpec().fields().size() <= 0) {
// TODO: I don't understand this, why delete all data? what if only part of the table is
// unpartitioned?
if (dataSpecs().stream().anyMatch(spec -> spec.fields().size() <= 0)) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
}
Expand Down
85 changes: 43 additions & 42 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
Expand Down Expand Up @@ -79,18 +80,18 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final ManifestFilterManager<DeleteFile> deleteFilterManager;

// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
private final Map<Integer, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
private Expression deleteExpression = Expressions.alwaysFalse();
private PartitionSpec dataSpec;
private final Set<PartitionSpec> dataSpecs = Sets.newHashSet();

// cache new data manifests after writing
private List<ManifestFile> cachedNewDataManifests = null;
private final List<ManifestFile> cachedNewDataManifests = Lists.newLinkedList();
private boolean hasNewDataFiles = false;

// cache new manifests for delete files
Expand All @@ -103,7 +104,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
super(ops);
this.tableName = tableName;
this.ops = ops;
this.dataSpec = null;
long targetSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
Expand Down Expand Up @@ -138,19 +138,18 @@ protected boolean isCaseSensitive() {
return caseSensitive;
}

protected PartitionSpec dataSpec() {
protected Set<PartitionSpec> dataSpecs() {
Preconditions.checkState(
dataSpec != null, "Cannot determine partition spec: no data files have been added");
// the spec is set when the write is started
return dataSpec;
!dataSpecs.isEmpty(), "Cannot determine partition specs: no data files have been added");
return dataSpecs;
}

protected Expression rowFilter() {
return deleteExpression;
}

protected List<DataFile> addedDataFiles() {
return ImmutableList.copyOf(newDataFiles);
protected Map<Integer, List<DataFile>> addedDataFilesBySpec() {
return ImmutableMap.copyOf(newDataFilesBySpec);
}

protected void failAnyDelete() {
Expand Down Expand Up @@ -210,7 +209,7 @@ protected boolean deletesDeleteFiles() {
}

protected boolean addsDataFiles() {
return !newDataFiles.isEmpty();
return !newDataFilesBySpec.isEmpty();
}

protected boolean addsDeleteFiles() {
Expand All @@ -220,9 +219,11 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
setDataSpec(file);
addedFilesSummary.addedFile(dataSpec(), file);
PartitionSpec dataSpec = addDataSpec(file);
addedFilesSummary.addedFile(dataSpec, file);
hasNewDataFiles = true;
List<DataFile> newDataFiles =
newDataFilesBySpec.computeIfAbsent(dataSpec.specId(), ignored -> Lists.newArrayList());
newDataFiles.add(file);
}

Expand All @@ -248,15 +249,12 @@ private void add(DeleteFileHolder fileHolder) {
hasNewDeleteFiles = true;
}

private void setDataSpec(DataFile file) {
private PartitionSpec addDataSpec(DataFile file) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkNotNull(
fileSpec, "Cannot find partition spec for data file: %s", file.path());
if (dataSpec == null) {
dataSpec = fileSpec;
} else if (dataSpec.specId() != file.specId()) {
throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId());
}
dataSpecs.add(fileSpec);
return fileSpec;
}

/** Add all files in a manifest to the new snapshot. */
Expand Down Expand Up @@ -878,7 +876,7 @@ public Object updateEvent() {

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
if (cachedNewDataManifests != null) {
if (!cachedNewDataManifests.isEmpty()) {
boolean hasDeletes = false;
for (ManifestFile manifest : cachedNewDataManifests) {
if (!committed.contains(manifest)) {
Expand All @@ -888,7 +886,7 @@ private void cleanUncommittedAppends(Set<ManifestFile> committed) {
}

if (hasDeletes) {
this.cachedNewDataManifests = null;
this.cachedNewDataManifests.clear();
}
}

Expand Down Expand Up @@ -934,7 +932,7 @@ protected void cleanUncommitted(Set<ManifestFile> committed) {

private Iterable<ManifestFile> prepareNewDataManifests() {
Iterable<ManifestFile> newManifests;
if (!newDataFiles.isEmpty()) {
if (!newDataFilesBySpec.isEmpty()) {
List<ManifestFile> dataFileManifests = newDataFilesAsManifests();
newManifests = Iterables.concat(dataFileManifests, appendManifests, rewrittenAppendManifests);
} else {
Expand All @@ -947,29 +945,32 @@ private Iterable<ManifestFile> prepareNewDataManifests() {
}

private List<ManifestFile> newDataFilesAsManifests() {
if (hasNewDataFiles && cachedNewDataManifests != null) {
if (hasNewDataFiles && !cachedNewDataManifests.isEmpty()) {
cachedNewDataManifests.forEach(file -> deleteFile(file.path()));
cachedNewDataManifests = null;
cachedNewDataManifests.clear();
}

if (cachedNewDataManifests == null) {
try {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec());
try {
if (newDataFilesDataSequenceNumber == null) {
newDataFiles.forEach(writer::add);
} else {
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
}
} finally {
writer.close();
}

this.cachedNewDataManifests = writer.toManifestFiles();
this.hasNewDataFiles = false;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(specId, newDataFiles) -> {
PartitionSpec dataSpec = ops.current().spec(specId);
try {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(dataSpec);
try {
if (newDataFilesDataSequenceNumber == null) {
newDataFiles.forEach(writer::add);
} else {
newDataFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber));
}
} finally {
writer.close();
}
this.cachedNewDataManifests.addAll(writer.toManifestFiles());
this.hasNewDataFiles = false;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
});
}

return cachedNewDataManifests;
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMergeAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -92,6 +93,77 @@ public void testEmptyTableAppend() {
statuses(Status.ADDED, Status.ADDED));
}

@Test
public void testEmptyTableAppendFilesWithDifferentSpecs() {
Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());

TableMetadata base = readMetadata();
Assert.assertNull("Should not have a current snapshot", base.currentSnapshot());
Assert.assertEquals("Last sequence number should be 0", 0, base.lastSequenceNumber());

table.updateSpec().addField("id").commit();
PartitionSpec new_spec = table.spec();

Assert.assertEquals("Table should have 2 specs", table.specs().size(), 2);

DataFile file_original_spec =
DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withPartitionPath("data_bucket=0")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();

DataFile file_new_spec =
DataFiles.builder(new_spec)
.withPath("/path/to/data-b.parquet")
.withPartitionPath("data_bucket=0/id=0")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();

Snapshot committedSnapshot =
commit(
table,
table.newAppend().appendFile(file_original_spec).appendFile(file_new_spec),
branch);

Assert.assertNotNull("Should create a snapshot", committedSnapshot);
V1Assert.assertEquals(
"Last sequence number should be 0", 0, table.ops().current().lastSequenceNumber());
V2Assert.assertEquals(
"Last sequence number should be 1", 1, table.ops().current().lastSequenceNumber());

Assert.assertEquals(
"Should create 2 manifests for initial write, 1 manifest per spec",
2,
committedSnapshot.allManifests(table.io()).size());

long snapshotId = committedSnapshot.snapshotId();

validateManifest(
committedSnapshot.allManifests(table.io()).stream()
.filter(m -> Objects.equals(m.partitionSpecId(), SPEC.specId()))
.findAny()
.get(),
dataSeqs(1L),
fileSeqs(1L),
ids(snapshotId),
files(file_original_spec),
statuses(Status.ADDED));

validateManifest(
committedSnapshot.allManifests(table.io()).stream()
.filter(m -> Objects.equals(m.partitionSpecId(), new_spec.specId()))
.findAny()
.get(),
dataSeqs(1L),
fileSeqs(1L),
ids(snapshotId),
files(file_new_spec),
statuses(Status.ADDED));
}

@Test
public void testEmptyTableAppendManifest() throws IOException {
Assert.assertEquals("Table should start empty", 0, listManifestFiles().size());
Expand Down

0 comments on commit 8320a0e

Please sign in to comment.