Skip to content

Commit

Permalink
Undo BaseReplacePartitions changes, just throw if multiple specs adde…
Browse files Browse the repository at this point in the history
…d, unsure about this
  • Loading branch information
fqaiser94 committed Mar 11, 2024
1 parent 90f7a7e commit 724a1d8
Showing 1 changed file with 26 additions and 28 deletions.
54 changes: 26 additions & 28 deletions core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,38 +87,36 @@ public BaseReplacePartitions toBranch(String branch) {

@Override
public void validate(TableMetadata currentMetadata, Snapshot parent) {
dataSpecs()
.forEach(
dataSpec -> {
if (validateConflictingData) {
if (dataSpec.isUnpartitioned()) {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}

if (validateConflictingDeletes) {
if (dataSpec.isUnpartitioned()) {
validateDeletedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
validateNoNewDeleteFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateDeletedDataFiles(
currentMetadata, startingSnapshotId, replacedPartitions, parent);
validateNoNewDeleteFiles(
currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}
});
if (validateConflictingData) {
// TODO: Still using `dataSpec` method (rather than `dataSpecs`)
// which will throw if files with multiple specs are added
// Not sure what the logic should be now if we allow files with multiple specs to be added
if (dataSpec().isUnpartitioned()) {
validateAddedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateAddedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}

if (validateConflictingDeletes) {
// TODO: same here
if (dataSpec().isUnpartitioned()) {
validateDeletedDataFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
validateNoNewDeleteFiles(
currentMetadata, startingSnapshotId, Expressions.alwaysTrue(), parent);
} else {
validateDeletedDataFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
validateNoNewDeleteFiles(currentMetadata, startingSnapshotId, replacedPartitions, parent);
}
}
}

@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
if (dataSpecs().stream().anyMatch(spec -> spec.fields().size() <= 0)) {
// TODO: same here
if (dataSpec().fields().size() <= 0) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
}
Expand Down

0 comments on commit 724a1d8

Please sign in to comment.