Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Support committing delete files with multiple specs #2985

Merged
merged 2 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetecti
@Override
protected void validate(TableMetadata base) {
if (validateAddedFilesMatchOverwriteFilter) {
PartitionSpec spec = writeSpec();
PartitionSpec spec = dataSpec();
Expression rowFilter = rowFilter();

Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public ReplacePartitions validateAppendOnly() {

@Override
public List<ManifestFile> apply(TableMetadata base) {
if (writeSpec().fields().size() <= 0) {
if (dataSpec().fields().size() <= 0) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
}
Expand Down
101 changes: 57 additions & 44 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.events.CreateSnapshotEvent;
Expand All @@ -39,6 +40,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
Expand Down Expand Up @@ -78,25 +80,27 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final List<DataFile> newFiles = Lists.newArrayList();
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList();
private final Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder();
private Expression deleteExpression = Expressions.alwaysFalse();
private PartitionSpec spec;
private PartitionSpec dataSpec;

// cache new manifests after writing
private ManifestFile cachedNewManifest = null;
private boolean hasNewFiles = false;
private ManifestFile cachedNewDeleteManifest = null;

// cache new manifests for delete files
private final List<ManifestFile> cachedNewDeleteManifests = Lists.newLinkedList();
private boolean hasNewDeleteFiles = false;

MergingSnapshotProducer(String tableName, TableOperations ops) {
super(ops);
this.tableName = tableName;
this.ops = ops;
this.spec = null;
this.dataSpec = null;
long targetSizeBytes = ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
int minCountToMerge = ops.current()
Expand All @@ -117,11 +121,10 @@ public ThisT set(String property, String value) {
return self();
}

protected PartitionSpec writeSpec() {
Preconditions.checkState(spec != null,
"Cannot determine partition spec: no data or delete files have been added");
protected PartitionSpec dataSpec() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming this does require touching more places but I think keeping it writeSpec will be confusing.

Preconditions.checkState(dataSpec != null, "Cannot determine partition spec: no data files have been added");
// the spec is set when the write is started
return spec;
return dataSpec;
}

protected Expression rowFilter() {
Expand Down Expand Up @@ -190,8 +193,9 @@ protected void delete(CharSequence path) {
* Add a data file to the new snapshot.
*/
protected void add(DataFile file) {
setWriteSpec(file);
addedFilesSummary.addedFile(writeSpec(), file);
Preconditions.checkNotNull(file, "Invalid data file: null");
setDataSpec(file);
addedFilesSummary.addedFile(dataSpec(), file);
hasNewFiles = true;
newFiles.add(file);
}
Expand All @@ -200,21 +204,21 @@ protected void add(DataFile file) {
* Add a delete file to the new snapshot.
*/
protected void add(DeleteFile file) {
setWriteSpec(file);
addedFilesSummary.addedFile(writeSpec(), file);
Preconditions.checkNotNull(file, "Invalid delete file: null");
PartitionSpec fileSpec = ops.current().spec(file.specId());
List<DeleteFile> deleteFiles = newDeleteFilesBySpec.computeIfAbsent(file.specId(), specId -> Lists.newArrayList());
deleteFiles.add(file);
addedFilesSummary.addedFile(fileSpec, file);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file spec is only used for partition summaries. I added a test that shows it works as expected.

hasNewDeleteFiles = true;
newDeleteFiles.add(file);
}

private void setWriteSpec(ContentFile<?> file) {
Preconditions.checkNotNull(file, "Invalid content file: null");
PartitionSpec writeSpec = ops.current().spec(file.specId());
Preconditions.checkNotNull(writeSpec,
"Cannot find partition spec for file: %s", file.path());
if (spec == null) {
spec = writeSpec;
} else if (spec.specId() != file.specId()) {
throw new ValidationException("Invalid file, expected spec id: %d", spec.specId());
private void setDataSpec(DataFile file) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkNotNull(fileSpec, "Cannot find partition spec for data file: %s", file.path());
if (dataSpec == null) {
dataSpec = fileSpec;
} else if (dataSpec.specId() != file.specId()) {
throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId());
}
}

Expand Down Expand Up @@ -463,9 +467,13 @@ private void cleanUncommittedAppends(Set<ManifestFile> committed) {
this.cachedNewManifest = null;
}

if (cachedNewDeleteManifest != null && !committed.contains(cachedNewDeleteManifest)) {
deleteFile(cachedNewDeleteManifest.path());
this.cachedNewDeleteManifest = null;
ListIterator<ManifestFile> deleteManifestsIterator = cachedNewDeleteManifests.listIterator();
while (deleteManifestsIterator.hasNext()) {
ManifestFile deleteManifest = deleteManifestsIterator.next();
if (!committed.contains(deleteManifest)) {
deleteFile(deleteManifest.path());
deleteManifestsIterator.remove();
}
}

// rewritten manifests are always owned by the table
Expand Down Expand Up @@ -518,7 +526,7 @@ private ManifestFile newFilesAsManifest() {

if (cachedNewManifest == null) {
try {
ManifestWriter<DataFile> writer = newManifestWriter(writeSpec());
ManifestWriter<DataFile> writer = newManifestWriter(dataSpec());
try {
writer.addAll(newFiles);
} finally {
Expand All @@ -536,36 +544,41 @@ private ManifestFile newFilesAsManifest() {
}

private Iterable<ManifestFile> prepareDeleteManifests() {
if (newDeleteFiles.isEmpty()) {
if (newDeleteFilesBySpec.isEmpty()) {
return ImmutableList.of();
}

return ImmutableList.of(newDeleteFilesAsManifest());
return newDeleteFilesAsManifests();
}

private ManifestFile newDeleteFilesAsManifest() {
if (hasNewDeleteFiles && cachedNewDeleteManifest != null) {
deleteFile(cachedNewDeleteManifest.path());
cachedNewDeleteManifest = null;
private List<ManifestFile> newDeleteFilesAsManifests() {
if (hasNewDeleteFiles && cachedNewDeleteManifests.size() > 0) {
for (ManifestFile cachedNewDeleteManifest : cachedNewDeleteManifests) {
deleteFile(cachedNewDeleteManifest.path());
}
cachedNewDeleteManifests.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this will rewrite all delete manifests even if there is only one new delete file. I think it's fine to simplify it right now since we don't expect this case very often. But it would be good to note that this is something we can improve in a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment. I think this will be rare enough in real world so should be fine to optimize later.

}

if (cachedNewDeleteManifest == null) {
try {
ManifestWriter<DeleteFile> writer = newDeleteManifestWriter(writeSpec());
if (cachedNewDeleteManifests.isEmpty()) {
newDeleteFilesBySpec.forEach((specId, deleteFiles) -> {
PartitionSpec spec = ops.current().spec(specId);
try {
writer.addAll(newDeleteFiles);
} finally {
writer.close();
ManifestWriter<DeleteFile> writer = newDeleteManifestWriter(spec);
try {
writer.addAll(deleteFiles);
} finally {
writer.close();
}
cachedNewDeleteManifests.add(writer.toManifestFile());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
});

this.cachedNewDeleteManifest = writer.toManifestFile();
this.hasNewDeleteFiles = false;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer");
}
this.hasNewDeleteFiles = false;
}

return cachedNewDeleteManifest;
return cachedNewDeleteManifests;
}

private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.LongStream;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -433,6 +434,26 @@ void validateDeleteManifest(ManifestFile manifest,
Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
}

protected DataFile newDataFile(String partitionPath) {
return DataFiles.builder(table.spec())
.withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
}

protected DeleteFile newDeleteFile(int specId, String partitionPath) {
PartitionSpec spec = table.specs().get(specId);
return FileMetadata.deleteFileBuilder(spec)
.ofPositionDeletes()
.withPath("/path/to/delete" + UUID.randomUUID() + ".parquet")
aokolnychyi marked this conversation as resolved.
Show resolved Hide resolved
.withFileSizeInBytes(10)
.withPartitionPath(partitionPath)
.withRecordCount(1)
.build();
}

static void validateManifestEntries(ManifestFile manifest,
Iterator<Long> ids,
Iterator<DataFile> expectedFiles,
Expand Down
Loading