Skip to content

Commit

Permalink
Core: Support committing delete files with multiple specs
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Sep 13, 2021
1 parent 838cc65 commit b199536
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetecti
@Override
protected void validate(TableMetadata base) {
if (validateAddedFilesMatchOverwriteFilter) {
PartitionSpec spec = writeSpec();
PartitionSpec spec = dataSpec();
Expression rowFilter = rowFilter();

Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public ReplacePartitions validateAppendOnly() {

@Override
public List<ManifestFile> apply(TableMetadata base) {
if (writeSpec().fields().size() <= 0) {
if (dataSpec().fields().size() <= 0) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
}
Expand Down
101 changes: 57 additions & 44 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.events.CreateSnapshotEvent;
Expand All @@ -39,6 +40,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
Expand Down Expand Up @@ -78,25 +80,27 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final List<DataFile> newFiles = Lists.newArrayList();
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList();
private final Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
private Expression deleteExpression = Expressions.alwaysFalse();
private PartitionSpec spec;
private PartitionSpec dataSpec;

// cache new manifests after writing
private ManifestFile cachedNewManifest = null;
private boolean hasNewFiles = false;
private ManifestFile cachedNewDeleteManifest = null;

// cache new manifests for delete files
private final List<ManifestFile> cachedNewDeleteManifests = Lists.newLinkedList();
private boolean hasNewDeleteFiles = false;

MergingSnapshotProducer(String tableName, TableOperations ops) {
super(ops);
this.tableName = tableName;
this.ops = ops;
this.spec = null;
this.dataSpec = null;
long targetSizeBytes = ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
int minCountToMerge = ops.current()
Expand All @@ -117,11 +121,10 @@ public ThisT set(String property, String value) {
return self();
}

protected PartitionSpec writeSpec() {
Preconditions.checkState(spec != null,
"Cannot determine partition spec: no data or delete files have been added");
protected PartitionSpec dataSpec() {
Preconditions.checkState(dataSpec != null, "Cannot determine partition spec: no data files have been added");
// the spec is set when the write is started
return spec;
return dataSpec;
}

protected Expression rowFilter() {
Expand Down Expand Up @@ -190,8 +193,9 @@ protected void delete(CharSequence path) {
* Add a data file to the new snapshot.
*/
protected void add(DataFile file) {
setWriteSpec(file);
addedFilesSummary.addedFile(writeSpec(), file);
Preconditions.checkNotNull(file, "Invalid data file: null");
setDataSpec(file);
addedFilesSummary.addedFile(dataSpec(), file);
hasNewFiles = true;
newFiles.add(file);
}
Expand All @@ -200,21 +204,21 @@ protected void add(DataFile file) {
* Add a delete file to the new snapshot.
*/
protected void add(DeleteFile file) {
setWriteSpec(file);
addedFilesSummary.addedFile(writeSpec(), file);
Preconditions.checkNotNull(file, "Invalid delete file: null");
PartitionSpec fileSpec = ops.current().spec(file.specId());
List<DeleteFile> deleteFiles = newDeleteFilesBySpec.computeIfAbsent(file.specId(), specId -> Lists.newArrayList());
deleteFiles.add(file);
addedFilesSummary.addedFile(fileSpec, file);
hasNewDeleteFiles = true;
newDeleteFiles.add(file);
}

private void setWriteSpec(ContentFile<?> file) {
Preconditions.checkNotNull(file, "Invalid content file: null");
PartitionSpec writeSpec = ops.current().spec(file.specId());
Preconditions.checkNotNull(writeSpec,
"Cannot find partition spec for file: %s", file.path());
if (spec == null) {
spec = writeSpec;
} else if (spec.specId() != file.specId()) {
throw new ValidationException("Invalid file, expected spec id: %d", spec.specId());
private void setDataSpec(DataFile file) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkNotNull(fileSpec, "Cannot find partition spec for data file: %s", file.path());
if (dataSpec == null) {
dataSpec = fileSpec;
} else if (dataSpec.specId() != file.specId()) {
throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId());
}
}

Expand Down Expand Up @@ -463,9 +467,13 @@ private void cleanUncommittedAppends(Set<ManifestFile> committed) {
this.cachedNewManifest = null;
}

if (cachedNewDeleteManifest != null && !committed.contains(cachedNewDeleteManifest)) {
deleteFile(cachedNewDeleteManifest.path());
this.cachedNewDeleteManifest = null;
ListIterator<ManifestFile> deleteManifestsIterator = cachedNewDeleteManifests.listIterator();
while (deleteManifestsIterator.hasNext()) {
ManifestFile deleteManifest = deleteManifestsIterator.next();
if (!committed.contains(deleteManifest)) {
deleteFile(deleteManifest.path());
deleteManifestsIterator.remove();
}
}

// rewritten manifests are always owned by the table
Expand Down Expand Up @@ -518,7 +526,7 @@ private ManifestFile newFilesAsManifest() {

if (cachedNewManifest == null) {
try {
ManifestWriter<DataFile> writer = newManifestWriter(writeSpec());
ManifestWriter<DataFile> writer = newManifestWriter(dataSpec());
try {
writer.addAll(newFiles);
} finally {
Expand All @@ -536,36 +544,41 @@ private ManifestFile newFilesAsManifest() {
}

private Iterable<ManifestFile> prepareDeleteManifests() {
if (newDeleteFiles.isEmpty()) {
if (newDeleteFilesBySpec.isEmpty()) {
return ImmutableList.of();
}

return ImmutableList.of(newDeleteFilesAsManifest());
return newDeleteFilesAsManifests();
}

private ManifestFile newDeleteFilesAsManifest() {
if (hasNewDeleteFiles && cachedNewDeleteManifest != null) {
deleteFile(cachedNewDeleteManifest.path());
cachedNewDeleteManifest = null;
private List<ManifestFile> newDeleteFilesAsManifests() {
if (hasNewDeleteFiles && cachedNewDeleteManifests.size() > 0) {
for (ManifestFile cachedNewDeleteManifest : cachedNewDeleteManifests) {
deleteFile(cachedNewDeleteManifest.path());
}
cachedNewDeleteManifests.clear();
}

if (cachedNewDeleteManifest == null) {
try {
ManifestWriter<DeleteFile> writer = newDeleteManifestWriter(writeSpec());
if (cachedNewDeleteManifests.isEmpty()) {
newDeleteFilesBySpec.forEach((specId, deleteFiles) -> {
PartitionSpec spec = ops.current().spec(specId);
try {
writer.addAll(newDeleteFiles);
} finally {
writer.close();
ManifestWriter<DeleteFile> writer = newDeleteManifestWriter(spec);
try {
writer.addAll(deleteFiles);
} finally {
writer.close();
}
cachedNewDeleteManifests.add(writer.toManifestFile());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
});

this.cachedNewDeleteManifest = writer.toManifestFile();
this.hasNewDeleteFiles = false;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
this.hasNewDeleteFiles = false;
}

return cachedNewDeleteManifest;
return cachedNewDeleteManifests;
}

private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.LongStream;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -433,6 +434,26 @@ void validateDeleteManifest(ManifestFile manifest,
Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
}

protected DataFile newDataFile(String partitionPath) {
return DataFiles.builder(table.spec())
.withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
}

protected DeleteFile newDeleteFile(int specId, String partitionPath) {
PartitionSpec spec = table.specs().get(specId);
return FileMetadata.deleteFileBuilder(spec)
.ofPositionDeletes()
.withPath("/path/to/delete" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
}

static void validateManifestEntries(ManifestFile manifest,
Iterator<Long> ids,
Iterator<DataFile> expectedFiles,
Expand Down
Loading

0 comments on commit b199536

Please sign in to comment.