From 486c7316ff78517fa61e4f7f77220b0a18016ebd Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 6 May 2022 14:16:11 +0200 Subject: [PATCH] Add statistics information in table snapshot --- .palantir/revapi.yml | 3 + .../java/org/apache/iceberg/Snapshot.java | 9 ++ .../org/apache/iceberg/StatisticsFile.java | 36 ++++++ .../java/org/apache/iceberg/Transaction.java | 8 ++ .../apache/iceberg/UpdateTableStatistics.java | 44 +++++++ .../java/org/apache/iceberg/BaseSnapshot.java | 18 ++- .../org/apache/iceberg/BaseTransaction.java | 8 ++ .../iceberg/BaseUpdateTableStatistics.java | 117 ++++++++++++++++++ .../iceberg/CommitCallbackTransaction.java | 5 + .../apache/iceberg/GenericStatisticsFile.java | 78 ++++++++++++ .../org/apache/iceberg/ReachableFileUtil.java | 20 +++ .../org/apache/iceberg/SnapshotParser.java | 93 +++++++++++++- .../org/apache/iceberg/SnapshotProducer.java | 10 +- .../iceberg/TestMetadataUpdateParser.java | 4 +- .../org/apache/iceberg/TestSnapshotJson.java | 6 +- .../org/apache/iceberg/TestTableMetadata.java | 30 ++--- .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 55 ++++++++ .../TestRemoveOrphanFilesProcedure.java | 87 +++++++++++++ .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 55 ++++++++ .../TestRemoveOrphanFilesProcedure.java | 87 +++++++++++++ .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 55 ++++++++ .../TestRemoveOrphanFilesProcedure.java | 58 +++++++++ .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 55 ++++++++ 27 files changed, 915 insertions(+), 30 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/StatisticsFile.java create mode 100644 api/src/main/java/org/apache/iceberg/UpdateTableStatistics.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseUpdateTableStatistics.java create mode 100644 core/src/main/java/org/apache/iceberg/GenericStatisticsFile.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a992ca3f0449..3007b4bd975a 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -91,3 +91,6 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method org.apache.iceberg.ReplacePartitions org.apache.iceberg.ReplacePartitions::validateNoConflictingDeletes()" justification: "Accept all changes prior to introducing API compatibility checks" + - code: "java.method.addedToInterface" + new: "method org.apache.iceberg.UpdateTableStatistics org.apache.iceberg.Transaction::newUpdateTableStatistics()" + justification: "new API method" diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index d10a6c3a49d1..2d9f34e27fe7 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -135,4 +135,13 @@ public interface Snapshot extends Serializable { default Integer schemaId() { return null; } + + /** + * Return all statistics files associated with the table. + * + * @return all statistics files associated with the table. + */ + default Iterable statisticsFiles() { + return null; + } } diff --git a/api/src/main/java/org/apache/iceberg/StatisticsFile.java b/api/src/main/java/org/apache/iceberg/StatisticsFile.java new file mode 100644 index 000000000000..677b464d8d49 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/StatisticsFile.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public interface StatisticsFile { + String location(); + + long fileSizeInBytes(); + + long fileFooterSizeInBytes(); + + long sequenceNumber(); + + Map>> statisticsFieldsSets(); +} diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java index 609f86d1a098..e616666de5f0 100644 --- a/api/src/main/java/org/apache/iceberg/Transaction.java +++ b/api/src/main/java/org/apache/iceberg/Transaction.java @@ -137,6 +137,14 @@ default AppendFiles newFastAppend() { */ DeleteFiles newDelete(); + /** + * Create a new {@link UpdateTableStatistics update table statistics API} to add or remove statistics + * files in this table. + * + * @return a new {@link UpdateTableStatistics} + */ + UpdateTableStatistics newUpdateTableStatistics(); + /** * Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table. * diff --git a/api/src/main/java/org/apache/iceberg/UpdateTableStatistics.java b/api/src/main/java/org/apache/iceberg/UpdateTableStatistics.java new file mode 100644 index 000000000000..9599ae3ea7c1 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/UpdateTableStatistics.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +/** + * API for adding or removing statistics files from a table. + */ +public interface UpdateTableStatistics extends PendingUpdate { + /** + * Delete a statistics file path from the underlying table. + *

+ * To remove a statistics file from the table, this path must equal a path in the table's metadata. Paths + * that are different but equivalent will not be removed. For example, file:/path/file.avro is + * equivalent to file:///path/file.avro, but would not remove the latter path from the table. + * + * @param path a fully-qualified file path to remove from the table + * @return this for method chaining + */ + UpdateTableStatistics deleteStatisticsFile(CharSequence path); + + /** + * Add a statistics file to the underlying table. + * + * @return this for method chaining + */ + UpdateTableStatistics addStatisticsFile(StatisticsFile statisticsFile); +} diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 7626d4d2b6c7..6a49efeda0ba 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -44,6 +44,7 @@ class BaseSnapshot implements Snapshot { private final String operation; private final Map summary; private final Integer schemaId; + private final List statisticsFiles; // lazily initialized private transient List allManifests = null; @@ -61,7 +62,7 @@ class BaseSnapshot implements Snapshot { String... manifestFiles) { this(io, snapshotId, null, System.currentTimeMillis(), null, null, schemaId, Lists.transform(Arrays.asList(manifestFiles), - path -> new GenericManifestFile(io.newInputFile(path), 0))); + path -> new GenericManifestFile(io.newInputFile(path), 0)), null); } BaseSnapshot(FileIO io, @@ -72,7 +73,8 @@ class BaseSnapshot implements Snapshot { String operation, Map summary, Integer schemaId, - String manifestList) { + String manifestList, + List statisticsFiles) { this.io = io; this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; @@ -82,6 +84,7 @@ class BaseSnapshot implements Snapshot { this.summary = summary; this.schemaId = schemaId; this.manifestListLocation = manifestList; + this.statisticsFiles = statisticsFiles; } BaseSnapshot(FileIO io, @@ -91,8 +94,10 @@ class BaseSnapshot implements Snapshot { String operation, Map summary, Integer schemaId, - List dataManifests) { - this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, schemaId, null); + List dataManifests, + List statisticsFiles) { + this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, schemaId, null, + statisticsFiles); this.allManifests = dataManifests; } @@ -131,6 +136,11 @@ public Integer schemaId() { return schemaId; } + @Override + public Iterable statisticsFiles() { + return statisticsFiles; + } + private void cacheManifests() { if (io == null) { throw new IllegalStateException("Cannot cache changes: FileIO is null"); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 70410b682434..3a5c5e1d424c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -215,6 +215,14 @@ public DeleteFiles newDelete() { return delete; } + @Override + public UpdateTableStatistics newUpdateTableStatistics() { + checkLastOperationCommitted("UpdateTableStatistics"); + BaseUpdateTableStatistics updateStatistics = new BaseUpdateTableStatistics(transactionOps); + updates.add(updateStatistics); + return updateStatistics; + } + @Override public ExpireSnapshots expireSnapshots() { checkLastOperationCommitted("ExpireSnapshots"); diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdateTableStatistics.java b/core/src/main/java/org/apache/iceberg/BaseUpdateTableStatistics.java new file mode 100644 index 000000000000..d75a87e98db5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseUpdateTableStatistics.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; + +/** + * {@link UpdateTableStatistics Update table statistics} implementation. + */ +class BaseUpdateTableStatistics extends SnapshotProducer implements UpdateTableStatistics { + + private final TableOperations ops; + private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); + private final Set removedStatisticsFiles = Sets.newHashSet(); + private final Set addedStatisticsFiles = Sets.newHashSet(); + + protected BaseUpdateTableStatistics(TableOperations ops) { + super(ops); + this.ops = ops; + } + + @Override + protected BaseUpdateTableStatistics self() { + return this; + } + + @Override + public UpdateTableStatistics deleteStatisticsFile(CharSequence path) { + Preconditions.checkNotNull(path, "path is null"); + removedStatisticsFiles.add(path.toString()); + return this; + } + + @Override + public UpdateTableStatistics addStatisticsFile(StatisticsFile statisticsFile) { + Preconditions.checkNotNull(statisticsFile, "statisticsFile is null"); + addedStatisticsFiles.add(statisticsFile); + return this; + } + + @Override + public BaseUpdateTableStatistics set(String property, String value) { + summaryBuilder.set(property, value); + return this; + } + + @Override + protected void cleanUncommitted(Set committed) { + // TODO should i delete files pointed to by addedStatisticsFiles? + } + + @Override + protected String operation() { + return "update statistics"; + } + + @Override + public Snapshot apply() { + TableMetadata base = refresh(); + Long parentSnapshotId = base.currentSnapshot() != null ? + base.currentSnapshot().snapshotId() : null; + long sequenceNumber = base.nextSequenceNumber(); + + validate(base); + + List statisticsFiles = Lists.newArrayList(); + if (base.currentSnapshot() != null && base.currentSnapshot().statisticsFiles() != null) { + base.currentSnapshot().statisticsFiles().forEach(statisticsFiles::add); + } + statisticsFiles.removeIf(statisticsFile -> removedStatisticsFiles.contains(statisticsFile.location())); + statisticsFiles.addAll(addedStatisticsFiles); + + return new BaseSnapshot(ops.io(), + sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), + base.currentSchemaId(), base.currentSnapshot().manifestListLocation(), statisticsFiles); + } + + @Override + protected void validate(TableMetadata currentMetadata) { + ValidationException.check(currentMetadata.formatVersion() >= 2, + "Table statistics are not supported in format version %s", currentMetadata.formatVersion()); + } + + @Override + protected List apply(TableMetadata metadataToUpdate) { + // The method is not invoked, since apply() is overridden. + throw new UnsupportedOperationException(); + } + + @Override + protected Map summary() { + return summaryBuilder.build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java index bd15744a87ad..233c730b1c4d 100644 --- a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java +++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java @@ -102,6 +102,11 @@ public DeleteFiles newDelete() { return wrapped.newDelete(); } + @Override + public UpdateTableStatistics newUpdateTableStatistics() { + return wrapped.newUpdateTableStatistics(); + } + @Override public ExpireSnapshots expireSnapshots() { return wrapped.expireSnapshots(); diff --git a/core/src/main/java/org/apache/iceberg/GenericStatisticsFile.java b/core/src/main/java/org/apache/iceberg/GenericStatisticsFile.java new file mode 100644 index 000000000000..61ad06798d4c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/GenericStatisticsFile.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; + +public class GenericStatisticsFile implements StatisticsFile { + private final String location; + private final long fileSizeInBytes; + private final long fileFooterSizeInBytes; + private final long sequenceNumber; + private final Map>> statisticsFieldsSets; + + public GenericStatisticsFile( + String location, + long fileSizeInBytes, + long fileFooterSizeInBytes, + long sequenceNumber, + Map>> statisticsFieldsSets) { + this.location = Preconditions.checkNotNull(location, "location is null"); + this.fileSizeInBytes = fileSizeInBytes; + this.fileFooterSizeInBytes = fileFooterSizeInBytes; + this.sequenceNumber = sequenceNumber; + Preconditions.checkNotNull(statisticsFieldsSets, "statistics is null"); + this.statisticsFieldsSets = statisticsFieldsSets.entrySet().stream() + .collect(ImmutableMap.toImmutableMap( + Map.Entry::getKey, + entry -> entry.getValue().stream().map(ImmutableList::copyOf).collect(ImmutableSet.toImmutableSet()))); + } + + @Override + public String location() { + return location; + } + + @Override + public long fileSizeInBytes() { + return fileSizeInBytes; + } + + @Override + public long fileFooterSizeInBytes() { + return fileFooterSizeInBytes; + } + + @Override + public long sequenceNumber() { + return sequenceNumber; + } + + @Override + public Map>> statisticsFieldsSets() { + return statisticsFieldsSets; + } +} diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java index 004e31c33504..74a78d5a8893 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java @@ -114,4 +114,24 @@ public static List manifestListLocations(Table table) { } return manifestListLocations; } + + /** + * Returns locations of statistics files in a table. + * + * @param table table for which statistics files needs to be listed + * @return the location of statistics files + */ + public static List statisticsFilesLocations(Table table) { + Iterable snapshots = table.snapshots(); + List statisticsFilesLocations = Lists.newArrayList(); + for (Snapshot snapshot : snapshots) { + Iterable statisticsFiles = snapshot.statisticsFiles(); + if (statisticsFiles != null) { + for (StatisticsFile statisticsFile : statisticsFiles) { + statisticsFilesLocations.add(statisticsFile.location()); + } + } + } + return statisticsFilesLocations; + } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index e4a4428b49af..83502be16c50 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -26,11 +26,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.JsonUtil; public class SnapshotParser { @@ -47,6 +51,11 @@ private SnapshotParser() { private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; private static final String SCHEMA_ID = "schema-id"; + private static final String STATISTICS = "statistics"; + private static final String STATISTICS_FIELDS_SETS = "statistics-fields-sets"; + private static final String LOCATION = "location"; + private static final String FILE_SIZE_IN_BYTES = "file_size_in_bytes"; + private static final String FILE_FOOTER_SIZE_IN_BYTES = "file_footer_size_in_bytes"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { @@ -94,6 +103,36 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) generator.writeNumberField(SCHEMA_ID, snapshot.schemaId()); } + if (snapshot.statisticsFiles() != null) { + generator.writeArrayFieldStart(STATISTICS); + for (StatisticsFile statisticsFile : snapshot.statisticsFiles()) { + toJson(statisticsFile, generator); + } + generator.writeEndArray(); + } + + generator.writeEndObject(); + } + + private static void toJson(StatisticsFile statisticsFile, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField(LOCATION, statisticsFile.location()); + generator.writeNumberField(FILE_SIZE_IN_BYTES, statisticsFile.fileSizeInBytes()); + generator.writeNumberField(FILE_FOOTER_SIZE_IN_BYTES, statisticsFile.fileFooterSizeInBytes()); + generator.writeNumberField(SEQUENCE_NUMBER, statisticsFile.sequenceNumber()); + generator.writeObjectFieldStart(STATISTICS_FIELDS_SETS); + for (Map.Entry>> entry : statisticsFile.statisticsFieldsSets().entrySet()) { + generator.writeArrayFieldStart(entry.getKey()); + for (List columnIds : entry.getValue()) { + generator.writeStartArray(); + for (Integer columnId : columnIds) { + generator.writeNumber(columnId); + } + generator.writeEndArray(); + } + generator.writeEndArray(); + } + generator.writeEndObject(); generator.writeEndObject(); } @@ -154,21 +193,69 @@ static Snapshot fromJson(FileIO io, JsonNode node) { Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, node); + List statistics = null; + if (node.has(STATISTICS)) { + JsonNode statisticsNode = node.get(STATISTICS); + Preconditions.checkArgument(statisticsNode != null && !statisticsNode.isNull() && statisticsNode.isArray(), + "Cannot parse statistics from non-array value: %s", statisticsNode); + ImmutableList.Builder builder = ImmutableList.builderWithExpectedSize(statisticsNode.size()); + statisticsNode.elements().forEachRemaining(element -> builder.add(statisticsFileFromJson(element))); + statistics = builder.build(); + } + if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); return new BaseSnapshot( - io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, schemaId, manifestList); - + io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, schemaId, manifestList, statistics); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be // loaded lazily, if it is needed List manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node), location -> new GenericManifestFile(io.newInputFile(location), 0)); - return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, schemaId, manifests); + return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, schemaId, manifests, statistics); } } + private static StatisticsFile statisticsFileFromJson(JsonNode json) { + Preconditions.checkArgument(json != null && !json.isNull() && json.isObject(), + "Cannot parse statistics from non-object value: %s", json); + + String location = JsonUtil.getString(LOCATION, json); + long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE_IN_BYTES, json); + long fileFooterSizeInBytes = JsonUtil.getLong(FILE_FOOTER_SIZE_IN_BYTES, json); + long sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, json); + JsonNode statisticsNode = json.get(STATISTICS_FIELDS_SETS); + Preconditions.checkArgument(statisticsNode != null && !statisticsNode.isNull() && statisticsNode.isObject(), + "Cannot parse statistics from non-object value: %s", statisticsNode); + ImmutableMap.Builder>> statistics = ImmutableMap.builder(); + statisticsNode.fieldNames().forEachRemaining(fieldName -> { + JsonNode columnSetsNode = statisticsNode.get(fieldName); + Preconditions.checkArgument(columnSetsNode != null && !columnSetsNode.isNull() && columnSetsNode.isArray(), + "Cannot parse column sets from non-array value: %s", columnSetsNode); + Set> columnSets = Streams.stream(columnSetsNode.elements()) + .map(columnIdsNode -> { + Preconditions.checkArgument(columnIdsNode != null && !columnIdsNode.isNull() && columnIdsNode.isArray(), + "Cannot parse column IDs from non-array value: %s", columnIdsNode); + return Streams.stream(columnIdsNode.elements()) + .map(columnId -> { + Preconditions.checkArgument(columnId.isInt(), + "Cannot parse integer from non-int value: %s", columnId); + return columnId.asInt(); + }) + .collect(ImmutableList.toImmutableList()); + }) + .collect(ImmutableSet.toImmutableSet()); + statistics.put(fieldName, columnSets); + }); + return new GenericStatisticsFile( + location, + fileSizeInBytes, + fileFooterSizeInBytes, + sequenceNumber, + statistics.buildOrThrow()); + } + public static Snapshot fromJson(FileIO io, String json) { try { return fromJson(io, JsonUtil.mapper().readValue(json, JsonNode.class)); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 0f7eed5e491e..a1d3ebf6b716 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -37,6 +37,7 @@ import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -170,6 +171,9 @@ public Snapshot apply() { Long parentSnapshotId = base.currentSnapshot() != null ? base.currentSnapshot().snapshotId() : null; long sequenceNumber = base.nextSequenceNumber(); + List statisticsFiles = + base.currentSnapshot() != null && base.currentSnapshot().statisticsFiles() != null ? + ImmutableList.copyOf(base.currentSnapshot().statisticsFiles()) : null; // run validations from the child operation validate(base); @@ -201,12 +205,12 @@ public Snapshot apply() { return new BaseSnapshot(ops.io(), sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), - base.currentSchemaId(), manifestList.location()); + base.currentSchemaId(), manifestList.location(), statisticsFiles); } else { return new BaseSnapshot(ops.io(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), - base.currentSchemaId(), manifests); + base.currentSchemaId(), manifests, statisticsFiles); } } @@ -215,7 +219,7 @@ sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), oper /** * Returns the snapshot summary from the implementation and updates totals. */ - private Map summary(TableMetadata previous) { + protected Map summary(TableMetadata previous) { Map summary = summary(); if (summary == null) { diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 4a06512bc47f..f98d54efaca4 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -325,7 +325,7 @@ public void testAddSnapshotToJson() { Snapshot snapshot = new BaseSnapshot(null, snapshotId, parentId, System.currentTimeMillis(), DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"), - schemaId, manifests); + schemaId, manifests, null); String snapshotJson = SnapshotParser.toJson(snapshot, /* pretty */ false); String expected = String.format("{\"action\":\"%s\",\"snapshot\":%s}", action, snapshotJson); MetadataUpdate update = new MetadataUpdate.AddSnapshot(snapshot); @@ -344,7 +344,7 @@ public void testAddSnapshotFromJson() { new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0)); Map summary = ImmutableMap.of("files-added", "4", "files-deleted", "100"); Snapshot snapshot = new BaseSnapshot(null, snapshotId, parentId, System.currentTimeMillis(), - DataOperations.REPLACE, summary, schemaId, manifests); + DataOperations.REPLACE, summary, schemaId, manifests, null); String snapshotJson = SnapshotParser.toJson(snapshot, /* pretty */ false); String json = String.format("{\"action\":\"%s\",\"snapshot\":%s}", action, snapshotJson); MetadataUpdate expected = new MetadataUpdate.AddSnapshot(snapshot); diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index 22f0b0ccdbc6..6ff5f8cdbab3 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -79,7 +79,7 @@ public void testJsonConversionWithOperation() { Snapshot expected = new BaseSnapshot(ops.io(), id, parentId, System.currentTimeMillis(), DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"), - 3, manifests); + 3, manifests, null); String json = SnapshotParser.toJson(expected); Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json); @@ -122,9 +122,9 @@ public void testJsonConversionWithManifestList() throws IOException { Snapshot expected = new BaseSnapshot( ops.io(), id, 34, parentId, System.currentTimeMillis(), - null, null, 4, localInput(manifestList).location()); + null, null, 4, localInput(manifestList).location(), null); Snapshot inMemory = new BaseSnapshot( - ops.io(), id, parentId, expected.timestampMillis(), null, null, 4, manifests); + ops.io(), id, parentId, expected.timestampMillis(), null, null, 4, manifests, null); Assert.assertEquals("Files should match in memory list", inMemory.allManifests(), expected.allManifests()); diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 51484bdddc98..6181bb366ae7 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -89,11 +89,11 @@ public void testJsonConversion() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), null); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, 7, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), null); List snapshotLog = ImmutableList.builder() .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId())) @@ -179,11 +179,11 @@ public void testBackwardCompat() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())), null); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())), null); TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION, 0, System.currentTimeMillis(), 3, TableMetadata.INITIAL_SCHEMA_ID, @@ -250,11 +250,11 @@ public void testInvalidMainBranch() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, null, - ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), null); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, 7, - ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), null); List snapshotLog = ImmutableList.builder() .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId())) @@ -284,7 +284,7 @@ public void testMainWithoutCurrent() { long snapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot snapshot = new BaseSnapshot( ops.io(), snapshotId, null, snapshotId, null, null, null, - ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), null); Schema schema = new Schema(6, Types.NestedField.required(10, "x", Types.StringType.get())); @@ -376,11 +376,11 @@ public void testJsonWithPreviousMetadataLog() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), null); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), null); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -406,11 +406,11 @@ public void testAddPreviousMetadataRemoveNone() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), null); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), null); List reversedSnapshotLog = Lists.newArrayList(); reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId)); @@ -448,11 +448,11 @@ public void testAddPreviousMetadataRemoveOne() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), null); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), null); List reversedSnapshotLog = Lists.newArrayList(); reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId)); @@ -502,11 +502,11 @@ public void testAddPreviousMetadataRemoveMultiple() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), null); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), null); List reversedSnapshotLog = Lists.newArrayList(); reversedSnapshotLog.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshotId)); diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 42c54679b669..9ac7ec1a9095 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -137,6 +137,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, false)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 7cf2e1f295a5..b201df90fc42 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -21,6 +21,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -28,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -36,11 +40,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -48,16 +56,22 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.stats.Blob; +import org.apache.iceberg.stats.StatsFiles; +import org.apache.iceberg.stats.StatsWriter; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -703,4 +717,45 @@ public void testGarbageCollectionDisabled() { ValidationException.class, "Cannot remove orphan files: GC is disabled", () -> SparkActions.get().deleteOrphanFiles(table).execute()); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + File statsLocation = new File(new URI(tableLocation)).toPath().resolve("data").resolve("some-stats-file").toFile(); + StatisticsFile statisticsFile; + try (StatsWriter statsWriter = StatsFiles.write(Files.localOutput(statsLocation)) + .build()) { + statsWriter.add(new Blob("some-blob-type", ImmutableList.of(1), + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + statsWriter.finish(); + statisticsFile = new GenericStatisticsFile( + statsLocation.toString(), + statsWriter.fileSize(), + statsWriter.footerSize(), + 0, + statsWriter.statisticsFieldsSets()); + } + + Transaction transaction = table.newTransaction(); + transaction.newUpdateTableStatistics() + .addStatisticsFile(statisticsFile) + .commit(); + transaction.commitTransaction(); + + SparkActions actions = SparkActions.get(); + actions.deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(365)) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()).as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + } } diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index 724e17e50a2c..584859b90cfb 100644 --- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -19,17 +19,33 @@ package org.apache.iceberg.spark.extensions; +import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.stats.Blob; +import org.apache.iceberg.stats.StatsFiles; +import org.apache.iceberg.stats.StatsWriter; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -303,4 +319,75 @@ public void testConcurrentRemoveOrphanFilesWithInvalidInput() { "CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)", catalogName, tableIdent, -1)); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + if (!catalogName.equals("spark_catalog")) { + sql("CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", tableName); + } else { + // give a fresh location to Hive tables as Spark will not clean up the table location + // correctly while dropping tables through spark_catalog + sql("CREATE TABLE %s USING iceberg LOCATION '%s' " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", tableName, temp.newFolder()); + } + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + String statsFileName = "stats-file-" + UUID.randomUUID(); + File statsLocation = tableLocation(table).toPath().resolve("data").resolve(statsFileName).toFile(); + StatisticsFile statisticsFile; + try (StatsWriter statsWriter = StatsFiles.write(Files.localOutput(statsLocation)) + .build()) { + statsWriter.add(new Blob("some-blob-type", ImmutableList.of(1), + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + statsWriter.finish(); + statisticsFile = new GenericStatisticsFile( + statsLocation.toString(), + statsWriter.fileSize(), + statsWriter.footerSize(), + 0, + statsWriter.statisticsFieldsSets()); + } + + Transaction transaction = table.newTransaction(); + transaction.newUpdateTableStatistics() + .addStatisticsFile(statisticsFile) + .commit(); + transaction.commitTransaction(); + + // wait to ensure files are old enough + waitUntilAfter(System.currentTimeMillis()); + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + + List output = sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be no orphan files").isEmpty(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()).as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + } + + private static File tableLocation(Table table) { + // Depending on test case, location is URI or a local path + String location = table.location(); + File file = new File(location); + try { + URI uri = new URI(location); + if (uri.getScheme() != null) { + // Location is a well-formed URI + file = new File(uri); + } + } catch (URISyntaxException ignored) { + } + + Preconditions.checkState(file.isDirectory(), "Table location '%s' does not point to a directory", location); + return file; + } } diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 42c54679b669..9ac7ec1a9095 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -137,6 +137,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, false)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 17fb891a89c8..b6e6f3a62a27 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -21,6 +21,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -28,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -36,11 +40,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -48,16 +56,22 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.stats.Blob; +import org.apache.iceberg.stats.StatsFiles; +import org.apache.iceberg.stats.StatsWriter; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -703,4 +717,45 @@ public void testGarbageCollectionDisabled() { ValidationException.class, "Cannot remove orphan files: GC is disabled", () -> SparkActions.get().deleteOrphanFiles(table).execute()); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + File statsLocation = new File(new URI(tableLocation)).toPath().resolve("data").resolve("some-stats-file").toFile(); + StatisticsFile statisticsFile; + try (StatsWriter statsWriter = StatsFiles.write(Files.localOutput(statsLocation)) + .build()) { + statsWriter.add(new Blob("some-blob-type", ImmutableList.of(1), + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + statsWriter.finish(); + statisticsFile = new GenericStatisticsFile( + statsLocation.toString(), + statsWriter.fileSize(), + statsWriter.footerSize(), + 0, + statsWriter.statisticsFieldsSets()); + } + + Transaction transaction = table.newTransaction(); + transaction.newUpdateTableStatistics() + .addStatisticsFile(statisticsFile) + .commit(); + transaction.commitTransaction(); + + SparkActions actions = SparkActions.get(); + actions.deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(365)) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()).as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + } } diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index 724e17e50a2c..584859b90cfb 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -19,17 +19,33 @@ package org.apache.iceberg.spark.extensions; +import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.stats.Blob; +import org.apache.iceberg.stats.StatsFiles; +import org.apache.iceberg.stats.StatsWriter; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -303,4 +319,75 @@ public void testConcurrentRemoveOrphanFilesWithInvalidInput() { "CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)", catalogName, tableIdent, -1)); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + if (!catalogName.equals("spark_catalog")) { + sql("CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", tableName); + } else { + // give a fresh location to Hive tables as Spark will not clean up the table location + // correctly while dropping tables through spark_catalog + sql("CREATE TABLE %s USING iceberg LOCATION '%s' " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", tableName, temp.newFolder()); + } + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + String statsFileName = "stats-file-" + UUID.randomUUID(); + File statsLocation = tableLocation(table).toPath().resolve("data").resolve(statsFileName).toFile(); + StatisticsFile statisticsFile; + try (StatsWriter statsWriter = StatsFiles.write(Files.localOutput(statsLocation)) + .build()) { + statsWriter.add(new Blob("some-blob-type", ImmutableList.of(1), + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + statsWriter.finish(); + statisticsFile = new GenericStatisticsFile( + statsLocation.toString(), + statsWriter.fileSize(), + statsWriter.footerSize(), + 0, + statsWriter.statisticsFieldsSets()); + } + + Transaction transaction = table.newTransaction(); + transaction.newUpdateTableStatistics() + .addStatisticsFile(statisticsFile) + .commit(); + transaction.commitTransaction(); + + // wait to ensure files are old enough + waitUntilAfter(System.currentTimeMillis()); + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + + List output = sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be no orphan files").isEmpty(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()).as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + } + + private static File tableLocation(Table table) { + // Depending on test case, location is URI or a local path + String location = table.location(); + File file = new File(location); + try { + URI uri = new URI(location); + if (uri.getScheme() != null) { + // Location is a well-formed URI + file = new File(uri); + } + } catch (URISyntaxException ignored) { + } + + Preconditions.checkState(file.isDirectory(), "Table location '%s' does not point to a directory", location); + return file; + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 42c54679b669..9ac7ec1a9095 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -137,6 +137,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, false)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 17fb891a89c8..b6e6f3a62a27 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -21,6 +21,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -28,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -36,11 +40,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -48,16 +56,22 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.stats.Blob; +import org.apache.iceberg.stats.StatsFiles; +import org.apache.iceberg.stats.StatsWriter; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -703,4 +717,45 @@ public void testGarbageCollectionDisabled() { ValidationException.class, "Cannot remove orphan files: GC is disabled", () -> SparkActions.get().deleteOrphanFiles(table).execute()); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + File statsLocation = new File(new URI(tableLocation)).toPath().resolve("data").resolve("some-stats-file").toFile(); + StatisticsFile statisticsFile; + try (StatsWriter statsWriter = StatsFiles.write(Files.localOutput(statsLocation)) + .build()) { + statsWriter.add(new Blob("some-blob-type", ImmutableList.of(1), + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + statsWriter.finish(); + statisticsFile = new GenericStatisticsFile( + statsLocation.toString(), + statsWriter.fileSize(), + statsWriter.footerSize(), + 0, + statsWriter.statisticsFieldsSets()); + } + + Transaction transaction = table.newTransaction(); + transaction.newUpdateTableStatistics() + .addStatisticsFile(statisticsFile) + .commit(); + transaction.commitTransaction(); + + SparkActions actions = SparkActions.get(); + actions.deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(365)) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()).as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + } } diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index e3a3bbf64b87..76c5659b5cad 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -19,27 +19,40 @@ package org.apache.iceberg.spark.extensions; +import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.stats.Blob; +import org.apache.iceberg.stats.StatsFiles; +import org.apache.iceberg.stats.StatsWriter; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -400,4 +413,49 @@ public void testRemoveOrphanFilesWithDeleteFiles() throws Exception { .collectAsList(); Assert.assertEquals("Rows must match", records, actualRecords); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + sql("CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + String statsFileName = "stats-file-" + UUID.randomUUID(); + File statsLocation = new File(new URI(table.location())).toPath().resolve("data").resolve(statsFileName).toFile(); + StatisticsFile statisticsFile; + try (StatsWriter statsWriter = StatsFiles.write(Files.localOutput(statsLocation)) + .build()) { + statsWriter.add(new Blob("some-blob-type", ImmutableList.of(1), + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + statsWriter.finish(); + statisticsFile = new GenericStatisticsFile( + statsLocation.toString(), + statsWriter.fileSize(), + statsWriter.footerSize(), + 0, + statsWriter.statisticsFieldsSets()); + } + + Transaction transaction = table.newTransaction(); + transaction.newUpdateTableStatistics() + .addStatisticsFile(statisticsFile) + .commit(); + transaction.commitTransaction(); + + // wait to ensure files are old enough + waitUntilAfter(System.currentTimeMillis()); + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + + List output = sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be no orphan files").isEmpty(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()).as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 99a026abf5f0..789f3d3ebbf1 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -174,6 +174,7 @@ private Dataset buildOtherMetadataFileDF(Table table, boolean includePrevio List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, includePreviousMetadataLocations)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF(FILE_PATH); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index bcabe38e79f1..7484cad01422 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -21,6 +21,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.Arrays; import java.util.List; @@ -29,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -37,11 +41,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -49,6 +57,8 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -56,6 +66,9 @@ import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.source.FilePathLastModifiedRecord; import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.iceberg.stats.Blob; +import org.apache.iceberg.stats.StatsFiles; +import org.apache.iceberg.stats.StatsWriter; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -63,6 +76,7 @@ import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -948,6 +962,47 @@ public void testCompareToFileList() throws IOException, InterruptedException { "Action should find nothing", Lists.newArrayList(), result4.orphanFileLocations()); } + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + File statsLocation = new File(new URI(tableLocation)).toPath().resolve("data").resolve("some-stats-file").toFile(); + StatisticsFile statisticsFile; + try (StatsWriter statsWriter = StatsFiles.write(Files.localOutput(statsLocation)) + .build()) { + statsWriter.add(new Blob("some-blob-type", ImmutableList.of(1), + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + statsWriter.finish(); + statisticsFile = new GenericStatisticsFile( + statsLocation.toString(), + statsWriter.fileSize(), + statsWriter.footerSize(), + 0, + statsWriter.statisticsFieldsSets()); + } + + Transaction transaction = table.newTransaction(); + transaction.newUpdateTableStatistics() + .addStatisticsFile(statisticsFile) + .commit(); + transaction.commitTransaction(); + + SparkActions actions = SparkActions.get(); + actions.deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(365)) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()).as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + } + protected long waitUntilAfter(long timestampMillis) { long current = System.currentTimeMillis(); while (current <= timestampMillis) {