From 2c099486764860ccd0d239b8c1ba52a8078dab98 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 19 Sep 2022 14:04:51 +0200 Subject: [PATCH] Retain table statistics during orphan files removal Do not delete table statistics files when running remove_orphan_files. --- .../apache/iceberg/GenericBlobMetadata.java | 10 ++ .../org/apache/iceberg/ReachableFileUtil.java | 15 ++ .../BaseDeleteReachableFilesSparkAction.java | 1 + .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 90 ++++++++++++ .../TestRemoveOrphanFilesProcedure.java | 130 ++++++++++++++++++ .../BaseDeleteReachableFilesSparkAction.java | 1 + .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 90 ++++++++++++ .../TestRemoveOrphanFilesProcedure.java | 130 ++++++++++++++++++ .../BaseDeleteReachableFilesSparkAction.java | 1 + .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 90 ++++++++++++ .../TestRemoveOrphanFilesProcedure.java | 94 +++++++++++++ .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 89 ++++++++++++ .../TestRemoveOrphanFilesProcedure.java | 94 +++++++++++++ .../spark/actions/BaseSparkAction.java | 1 + .../actions/TestRemoveOrphanFilesAction.java | 89 ++++++++++++ 19 files changed, 929 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/GenericBlobMetadata.java b/core/src/main/java/org/apache/iceberg/GenericBlobMetadata.java index 872a6ca806e8..46bedfa01753 100644 --- a/core/src/main/java/org/apache/iceberg/GenericBlobMetadata.java +++ b/core/src/main/java/org/apache/iceberg/GenericBlobMetadata.java @@ -27,6 +27,16 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; public class GenericBlobMetadata implements BlobMetadata { + + public static BlobMetadata from(org.apache.iceberg.puffin.BlobMetadata puffinMetadata) { + return new GenericBlobMetadata( + puffinMetadata.type(), + puffinMetadata.snapshotId(), + puffinMetadata.sequenceNumber(), + puffinMetadata.inputFields(), + puffinMetadata.properties()); + } + private final String type; private final long sourceSnapshotId; private final long sourceSnapshotSequenceNumber; diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java index 45c885a745e4..fdba8e295752 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java @@ -113,4 +113,19 @@ public static List manifestListLocations(Table table) { } return manifestListLocations; } + + /** + * Returns locations of statistics files in a table. + * + * @param table table for which statistics files needs to be listed + * @return the location of statistics files + */ + public static List statisticsFilesLocations(Table table) { + List statisticsFilesLocations = Lists.newArrayList(); + for (StatisticsFile statisticsFile : table.statisticsFiles()) { + statisticsFilesLocations.add(statisticsFile.path()); + } + + return statisticsFilesLocations; + } } diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java index 1431ae5d78ec..cba67d57ad14 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java @@ -151,6 +151,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, true)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark().createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index c9d93ce9de5f..c9e61b8c907f 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -145,6 +145,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, false)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index e42923c0bf23..84ccb86fac4c 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -22,6 +22,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -37,11 +40,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -49,6 +57,11 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -59,6 +72,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -644,4 +658,80 @@ public void testGarbageCollectionDisabled() { "Cannot remove orphan files: GC is disabled", () -> SparkActions.get().deleteOrphanFiles(table).execute()); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), + tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + table.refresh(); + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + + File statsLocation = + new File(new URI(tableLocation)) + .toPath() + .resolve("data") + .resolve("some-stats-file") + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + transaction.commitTransaction(); + + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + DeleteOrphanFiles.Result result = + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + Iterable orphanFileLocations = result.orphanFileLocations(); + Assertions.assertThat(orphanFileLocations).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(orphanFileLocations)) + .as("Deleted file") + .isEqualTo(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } } diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index fa43cf0e276c..7d5b5f88a18c 100644 --- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -21,17 +21,35 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED; +import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.Spark3Util; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -332,4 +350,116 @@ public void testConcurrentRemoveOrphanFilesWithInvalidInput() { "CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)", catalogName, tableIdent, -1)); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + if (!catalogName.equals("spark_catalog")) { + sql( + "CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", + tableName); + } else { + // give a fresh location to Hive tables as Spark will not clean up the table location + // correctly while dropping tables through spark_catalog + sql( + "CREATE TABLE %s USING iceberg LOCATION '%s' " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", + tableName, temp.newFolder()); + } + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + String statsFileName = "stats-file-" + UUID.randomUUID(); + File statsLocation = + (new URI(table.location()).isAbsolute() + ? new File(new URI(table.location())) + : new File(table.location())) + .toPath() + .resolve("data") + .resolve(statsFileName) + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction + .updateStatistics() + .setStatistics(statisticsFile.snapshotId(), statisticsFile) + .commit(); + transaction.commitTransaction(); + + // wait to ensure files are old enough + waitUntilAfter(System.currentTimeMillis()); + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + + List output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be no orphan files").isEmpty(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(output)) + .as("Deleted files") + .containsExactly(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } + + private static File tableLocation(Table table) { + // Depending on test case, location is URI or a local path + String location = table.location(); + File file = new File(location); + try { + URI uri = new URI(location); + if (uri.getScheme() != null) { + // Location is a well-formed URI + file = new File(uri); + } + } catch (URISyntaxException ignored) { + // Ignore + } + + Preconditions.checkState( + file.isDirectory(), "Table location '%s' does not point to a directory", location); + return file; + } } diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java index 1431ae5d78ec..cba67d57ad14 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java @@ -151,6 +151,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, true)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark().createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index c9d93ce9de5f..c9e61b8c907f 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -145,6 +145,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, false)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 70f119c45aaa..da988fddbbf4 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -22,6 +22,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -37,11 +40,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -49,6 +57,11 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -59,6 +72,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -644,4 +658,80 @@ public void testGarbageCollectionDisabled() { "Cannot remove orphan files: GC is disabled", () -> SparkActions.get().deleteOrphanFiles(table).execute()); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), + tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + table.refresh(); + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + + File statsLocation = + new File(new URI(tableLocation)) + .toPath() + .resolve("data") + .resolve("some-stats-file") + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + transaction.commitTransaction(); + + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + DeleteOrphanFiles.Result result = + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + Iterable orphanFileLocations = result.orphanFileLocations(); + Assertions.assertThat(orphanFileLocations).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(orphanFileLocations)) + .as("Deleted file") + .isEqualTo(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } } diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index fa43cf0e276c..7d5b5f88a18c 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -21,17 +21,35 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED; +import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.Spark3Util; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -332,4 +350,116 @@ public void testConcurrentRemoveOrphanFilesWithInvalidInput() { "CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)", catalogName, tableIdent, -1)); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + if (!catalogName.equals("spark_catalog")) { + sql( + "CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", + tableName); + } else { + // give a fresh location to Hive tables as Spark will not clean up the table location + // correctly while dropping tables through spark_catalog + sql( + "CREATE TABLE %s USING iceberg LOCATION '%s' " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", + tableName, temp.newFolder()); + } + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + String statsFileName = "stats-file-" + UUID.randomUUID(); + File statsLocation = + (new URI(table.location()).isAbsolute() + ? new File(new URI(table.location())) + : new File(table.location())) + .toPath() + .resolve("data") + .resolve(statsFileName) + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction + .updateStatistics() + .setStatistics(statisticsFile.snapshotId(), statisticsFile) + .commit(); + transaction.commitTransaction(); + + // wait to ensure files are old enough + waitUntilAfter(System.currentTimeMillis()); + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + + List output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be no orphan files").isEmpty(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(output)) + .as("Deleted files") + .containsExactly(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } + + private static File tableLocation(Table table) { + // Depending on test case, location is URI or a local path + String location = table.location(); + File file = new File(location); + try { + URI uri = new URI(location); + if (uri.getScheme() != null) { + // Location is a well-formed URI + file = new File(uri); + } + } catch (URISyntaxException ignored) { + // Ignore + } + + Preconditions.checkState( + file.isDirectory(), "Table location '%s' does not point to a directory", location); + return file; + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java index a1bc19d7dcc0..bfce42bb2580 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java @@ -151,6 +151,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, true)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark().createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 5abfcc4482a4..f47d113e1364 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -146,6 +146,7 @@ protected Dataset buildOtherMetadataFileDF(Table table) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, false)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return spark.createDataset(otherMetadataFiles, Encoders.STRING()).toDF("file_path"); } diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 70f119c45aaa..da988fddbbf4 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -22,6 +22,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -37,11 +40,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -49,6 +57,11 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -59,6 +72,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -644,4 +658,80 @@ public void testGarbageCollectionDisabled() { "Cannot remove orphan files: GC is disabled", () -> SparkActions.get().deleteOrphanFiles(table).execute()); } + + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), + tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + table.refresh(); + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + + File statsLocation = + new File(new URI(tableLocation)) + .toPath() + .resolve("data") + .resolve("some-stats-file") + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + transaction.commitTransaction(); + + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + DeleteOrphanFiles.Result result = + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + Iterable orphanFileLocations = result.orphanFileLocations(); + Assertions.assertThat(orphanFileLocations).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(orphanFileLocations)) + .as("Deleted file") + .isEqualTo(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } } diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index 7a2d5c56ec7c..be82880cb743 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -21,24 +21,37 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED; +import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.data.TestHelpers; @@ -51,6 +64,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -447,6 +461,86 @@ public void testRemoveOrphanFilesWithDeleteFiles() throws Exception { Assert.assertEquals("Rows must match", records, actualRecords); } + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + sql( + "CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", + tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + String statsFileName = "stats-file-" + UUID.randomUUID(); + File statsLocation = + new File(new URI(table.location())) + .toPath() + .resolve("data") + .resolve(statsFileName) + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction + .updateStatistics() + .setStatistics(statisticsFile.snapshotId(), statisticsFile) + .commit(); + transaction.commitTransaction(); + + // wait to ensure files are old enough + waitUntilAfter(System.currentTimeMillis()); + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + + List output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be no orphan files").isEmpty(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(output)) + .as("Deleted files") + .containsExactly(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } + @Test public void testRemoveOrphanFilesProcedureWithPrefixMode() throws NoSuchTableException, ParseException, IOException { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index a1dfa6009e9d..74a0e70738a7 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -181,6 +181,7 @@ private Dataset otherMetadataFileDS(Table table, boolean recursive) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, recursive)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return toFileInfoDS(otherMetadataFiles, OTHERS); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 10a33e836dd7..c3a6aa9e946d 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -22,6 +22,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.Arrays; import java.util.List; @@ -38,11 +41,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -50,6 +58,10 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -66,6 +78,7 @@ import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -876,6 +889,82 @@ protected long waitUntilAfter(long timestampMillis) { return current; } + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), + tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + table.refresh(); + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + + File statsLocation = + new File(new URI(tableLocation)) + .toPath() + .resolve("data") + .resolve("some-stats-file") + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + transaction.commitTransaction(); + + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + DeleteOrphanFiles.Result result = + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + Iterable orphanFileLocations = result.orphanFileLocations(); + Assertions.assertThat(orphanFileLocations).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(orphanFileLocations)) + .as("Deleted file") + .isEqualTo(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } + @Test public void testPathsWithExtraSlashes() { List validFiles = Lists.newArrayList("file:///dir1/dir2/file1"); diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java index 7a2d5c56ec7c..be82880cb743 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java @@ -21,24 +21,37 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED; +import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReachableFileUtil; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.data.TestHelpers; @@ -51,6 +64,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; +import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -447,6 +461,86 @@ public void testRemoveOrphanFilesWithDeleteFiles() throws Exception { Assert.assertEquals("Rows must match", records, actualRecords); } + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + sql( + "CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES('format-version'='2') " + + "AS SELECT 10 int, 'abc' data", + tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + String statsFileName = "stats-file-" + UUID.randomUUID(); + File statsLocation = + new File(new URI(table.location())) + .toPath() + .resolve("data") + .resolve(statsFileName) + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction + .updateStatistics() + .setStatistics(statisticsFile.snapshotId(), statisticsFile) + .commit(); + transaction.commitTransaction(); + + // wait to ensure files are old enough + waitUntilAfter(System.currentTimeMillis()); + Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())); + + List output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be no orphan files").isEmpty(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + output = + sql( + "CALL %s.system.remove_orphan_files(" + + "table => '%s'," + + "older_than => TIMESTAMP '%s')", + catalogName, tableIdent, currentTimestamp); + Assertions.assertThat(output).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(output)) + .as("Deleted files") + .containsExactly(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } + @Test public void testRemoveOrphanFilesProcedureWithPrefixMode() throws NoSuchTableException, ParseException, IOException { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index a1dfa6009e9d..74a0e70738a7 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -181,6 +181,7 @@ private Dataset otherMetadataFileDS(Table table, boolean recursive) { List otherMetadataFiles = Lists.newArrayList(); otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, recursive)); otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table)); + otherMetadataFiles.addAll(ReachableFileUtil.statisticsFilesLocations(table)); return toFileInfoDS(otherMetadataFiles, OTHERS); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 10a33e836dd7..c3a6aa9e946d 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -22,6 +22,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.Arrays; import java.util.List; @@ -38,11 +41,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.GenericBlobMetadata; +import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -50,6 +58,10 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hadoop.HiddenPathFilter; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -66,6 +78,7 @@ import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -876,6 +889,82 @@ protected long waitUntilAfter(long timestampMillis) { return current; } + @Test + public void testRemoveOrphanFilesWithStatisticFiles() throws Exception { + Table table = + TABLES.create( + SCHEMA, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), + tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + table.refresh(); + long snapshotId = table.currentSnapshot().snapshotId(); + long snapshotSequenceNumber = table.currentSnapshot().sequenceNumber(); + + File statsLocation = + new File(new URI(tableLocation)) + .toPath() + .resolve("data") + .resolve("some-stats-file") + .toFile(); + StatisticsFile statisticsFile; + try (PuffinWriter puffinWriter = Puffin.write(Files.localOutput(statsLocation)).build()) { + puffinWriter.add( + new Blob( + "some-blob-type", + ImmutableList.of(1), + snapshotId, + snapshotSequenceNumber, + ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8)))); + puffinWriter.finish(); + statisticsFile = + new GenericStatisticsFile( + snapshotId, + statsLocation.toString(), + puffinWriter.fileSize(), + puffinWriter.footerSize(), + puffinWriter.writtenBlobsMetadata().stream() + .map(GenericBlobMetadata::from) + .collect(ImmutableList.toImmutableList())); + } + + Transaction transaction = table.newTransaction(); + transaction.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + transaction.commitTransaction(); + + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + + Assertions.assertThat(statsLocation.exists()).as("stats file should exist").isTrue(); + Assertions.assertThat(statsLocation.length()) + .as("stats file length") + .isEqualTo(statisticsFile.fileSizeInBytes()); + + transaction = table.newTransaction(); + transaction.updateStatistics().removeStatistics(statisticsFile.snapshotId()).commit(); + transaction.commitTransaction(); + + DeleteOrphanFiles.Result result = + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis() + 1000) + .execute(); + Iterable orphanFileLocations = result.orphanFileLocations(); + Assertions.assertThat(orphanFileLocations).as("Should be orphan files").hasSize(1); + Assertions.assertThat(Iterables.getOnlyElement(orphanFileLocations)) + .as("Deleted file") + .isEqualTo(statsLocation.toURI().toString()); + Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse(); + } + @Test public void testPathsWithExtraSlashes() { List validFiles = Lists.newArrayList("file:///dir1/dir2/file1");