-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Support committing delete files with multiple specs #2985
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.List; | ||
import java.util.ListIterator; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import org.apache.iceberg.events.CreateSnapshotEvent; | ||
|
@@ -39,6 +40,7 @@ | |
import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Iterators; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
import org.apache.iceberg.util.CharSequenceSet; | ||
import org.apache.iceberg.util.Pair; | ||
|
@@ -78,25 +80,27 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> { | |
|
||
// update data | ||
private final List<DataFile> newFiles = Lists.newArrayList(); | ||
private final List<DeleteFile> newDeleteFiles = Lists.newArrayList(); | ||
private final Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = Maps.newHashMap(); | ||
private final List<ManifestFile> appendManifests = Lists.newArrayList(); | ||
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList(); | ||
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); | ||
private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder(); | ||
private Expression deleteExpression = Expressions.alwaysFalse(); | ||
private PartitionSpec spec; | ||
private PartitionSpec dataSpec; | ||
|
||
// cache new manifests after writing | ||
private ManifestFile cachedNewManifest = null; | ||
private boolean hasNewFiles = false; | ||
private ManifestFile cachedNewDeleteManifest = null; | ||
|
||
// cache new manifests for delete files | ||
private final List<ManifestFile> cachedNewDeleteManifests = Lists.newLinkedList(); | ||
private boolean hasNewDeleteFiles = false; | ||
|
||
MergingSnapshotProducer(String tableName, TableOperations ops) { | ||
super(ops); | ||
this.tableName = tableName; | ||
this.ops = ops; | ||
this.spec = null; | ||
this.dataSpec = null; | ||
long targetSizeBytes = ops.current() | ||
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); | ||
int minCountToMerge = ops.current() | ||
|
@@ -117,11 +121,10 @@ public ThisT set(String property, String value) { | |
return self(); | ||
} | ||
|
||
protected PartitionSpec writeSpec() { | ||
Preconditions.checkState(spec != null, | ||
"Cannot determine partition spec: no data or delete files have been added"); | ||
protected PartitionSpec dataSpec() { | ||
Preconditions.checkState(dataSpec != null, "Cannot determine partition spec: no data files have been added"); | ||
// the spec is set when the write is started | ||
return spec; | ||
return dataSpec; | ||
} | ||
|
||
protected Expression rowFilter() { | ||
|
@@ -190,8 +193,9 @@ protected void delete(CharSequence path) { | |
* Add a data file to the new snapshot. | ||
*/ | ||
protected void add(DataFile file) { | ||
setWriteSpec(file); | ||
addedFilesSummary.addedFile(writeSpec(), file); | ||
Preconditions.checkNotNull(file, "Invalid data file: null"); | ||
setDataSpec(file); | ||
addedFilesSummary.addedFile(dataSpec(), file); | ||
hasNewFiles = true; | ||
newFiles.add(file); | ||
} | ||
|
@@ -200,21 +204,21 @@ protected void add(DataFile file) { | |
* Add a delete file to the new snapshot. | ||
*/ | ||
protected void add(DeleteFile file) { | ||
setWriteSpec(file); | ||
addedFilesSummary.addedFile(writeSpec(), file); | ||
Preconditions.checkNotNull(file, "Invalid delete file: null"); | ||
PartitionSpec fileSpec = ops.current().spec(file.specId()); | ||
List<DeleteFile> deleteFiles = newDeleteFilesBySpec.computeIfAbsent(file.specId(), specId -> Lists.newArrayList()); | ||
deleteFiles.add(file); | ||
addedFilesSummary.addedFile(fileSpec, file); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The file spec is only used for partition summaries. I added a test that shows it works as expected. |
||
hasNewDeleteFiles = true; | ||
newDeleteFiles.add(file); | ||
} | ||
|
||
private void setWriteSpec(ContentFile<?> file) { | ||
Preconditions.checkNotNull(file, "Invalid content file: null"); | ||
PartitionSpec writeSpec = ops.current().spec(file.specId()); | ||
Preconditions.checkNotNull(writeSpec, | ||
"Cannot find partition spec for file: %s", file.path()); | ||
if (spec == null) { | ||
spec = writeSpec; | ||
} else if (spec.specId() != file.specId()) { | ||
throw new ValidationException("Invalid file, expected spec id: %d", spec.specId()); | ||
private void setDataSpec(DataFile file) { | ||
PartitionSpec fileSpec = ops.current().spec(file.specId()); | ||
Preconditions.checkNotNull(fileSpec, "Cannot find partition spec for data file: %s", file.path()); | ||
if (dataSpec == null) { | ||
dataSpec = fileSpec; | ||
} else if (dataSpec.specId() != file.specId()) { | ||
throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId()); | ||
} | ||
} | ||
|
||
|
@@ -463,9 +467,13 @@ private void cleanUncommittedAppends(Set<ManifestFile> committed) { | |
this.cachedNewManifest = null; | ||
} | ||
|
||
if (cachedNewDeleteManifest != null && !committed.contains(cachedNewDeleteManifest)) { | ||
deleteFile(cachedNewDeleteManifest.path()); | ||
this.cachedNewDeleteManifest = null; | ||
ListIterator<ManifestFile> deleteManifestsIterator = cachedNewDeleteManifests.listIterator(); | ||
while (deleteManifestsIterator.hasNext()) { | ||
ManifestFile deleteManifest = deleteManifestsIterator.next(); | ||
if (!committed.contains(deleteManifest)) { | ||
deleteFile(deleteManifest.path()); | ||
deleteManifestsIterator.remove(); | ||
} | ||
} | ||
|
||
// rewritten manifests are always owned by the table | ||
|
@@ -518,7 +526,7 @@ private ManifestFile newFilesAsManifest() { | |
|
||
if (cachedNewManifest == null) { | ||
try { | ||
ManifestWriter<DataFile> writer = newManifestWriter(writeSpec()); | ||
ManifestWriter<DataFile> writer = newManifestWriter(dataSpec()); | ||
try { | ||
writer.addAll(newFiles); | ||
} finally { | ||
|
@@ -536,36 +544,43 @@ private ManifestFile newFilesAsManifest() { | |
} | ||
|
||
private Iterable<ManifestFile> prepareDeleteManifests() { | ||
if (newDeleteFiles.isEmpty()) { | ||
if (newDeleteFilesBySpec.isEmpty()) { | ||
return ImmutableList.of(); | ||
} | ||
|
||
return ImmutableList.of(newDeleteFilesAsManifest()); | ||
return newDeleteFilesAsManifests(); | ||
} | ||
|
||
private ManifestFile newDeleteFilesAsManifest() { | ||
if (hasNewDeleteFiles && cachedNewDeleteManifest != null) { | ||
deleteFile(cachedNewDeleteManifest.path()); | ||
cachedNewDeleteManifest = null; | ||
private List<ManifestFile> newDeleteFilesAsManifests() { | ||
if (hasNewDeleteFiles && cachedNewDeleteManifests.size() > 0) { | ||
for (ManifestFile cachedNewDeleteManifest : cachedNewDeleteManifests) { | ||
deleteFile(cachedNewDeleteManifest.path()); | ||
} | ||
// this triggers a rewrite of all delete manifests even if there is only one new delete file | ||
// if there is a relevant use case in the future, the behavior can be optimized | ||
cachedNewDeleteManifests.clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: this will rewrite all delete manifests even if there is only one new delete file. I think it's fine to simplify it right now since we don't expect this case very often. But it would be good to note that this is something we can improve in a comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comment. I think this will be rare enough in real world so should be fine to optimize later. |
||
} | ||
|
||
if (cachedNewDeleteManifest == null) { | ||
try { | ||
ManifestWriter<DeleteFile> writer = newDeleteManifestWriter(writeSpec()); | ||
if (cachedNewDeleteManifests.isEmpty()) { | ||
newDeleteFilesBySpec.forEach((specId, deleteFiles) -> { | ||
PartitionSpec spec = ops.current().spec(specId); | ||
try { | ||
writer.addAll(newDeleteFiles); | ||
} finally { | ||
writer.close(); | ||
ManifestWriter<DeleteFile> writer = newDeleteManifestWriter(spec); | ||
try { | ||
writer.addAll(deleteFiles); | ||
} finally { | ||
writer.close(); | ||
} | ||
cachedNewDeleteManifests.add(writer.toManifestFile()); | ||
} catch (IOException e) { | ||
throw new RuntimeIOException(e, "Failed to close manifest writer"); | ||
} | ||
}); | ||
|
||
this.cachedNewDeleteManifest = writer.toManifestFile(); | ||
this.hasNewDeleteFiles = false; | ||
} catch (IOException e) { | ||
throw new RuntimeIOException(e, "Failed to close manifest writer"); | ||
} | ||
this.hasNewDeleteFiles = false; | ||
} | ||
|
||
return cachedNewDeleteManifest; | ||
return cachedNewDeleteManifests; | ||
} | ||
|
||
private class DataFileFilterManager extends ManifestFilterManager<DataFile> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming this does require touching more places but I think keeping it
writeSpec
will be confusing.