diff --git a/api/src/main/java/org/apache/iceberg/UpdateStatistics.java b/api/src/main/java/org/apache/iceberg/UpdateStatistics.java index 28e8a6a37d79..6e367122dcc9 100644 --- a/api/src/main/java/org/apache/iceberg/UpdateStatistics.java +++ b/api/src/main/java/org/apache/iceberg/UpdateStatistics.java @@ -18,8 +18,10 @@ */ package org.apache.iceberg; +import java.util.List; + /** API for updating statistics files in a table. */ -public interface UpdateStatistics extends PendingUpdate<Snapshot> { +public interface UpdateStatistics extends PendingUpdate<List<StatisticsFile>> { /** * Set the table's statistics file for given snapshot, replacing the previous statistics file for * the snapshot if any exists. diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 550dc39a18b7..9b985b7ce77d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -209,6 +209,11 @@ public DeleteFiles newDelete() { return new StreamingDelete(name, ops); } + @Override + public UpdateStatistics updateStatistics() { + return new SetStatistics(ops); + } + @Override public ExpireSnapshots expireSnapshots() { return new RemoveSnapshots(ops); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index d5e82cff73cd..6b3ec1c94ed5 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -216,6 +216,14 @@ public DeleteFiles newDelete() { return delete; } + @Override + public UpdateStatistics updateStatistics() { + checkLastOperationCommitted("UpdateStatistics"); + UpdateStatistics updateStatistics = new SetStatistics(transactionOps); + updates.add(updateStatistics); + return updateStatistics; + } + @Override public ExpireSnapshots expireSnapshots() { checkLastOperationCommitted("ExpireSnapshots"); diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java new file mode 100644 index 000000000000..640ab1d8a186 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class SetStatistics implements UpdateStatistics { + private final TableOperations ops; + private final Map<Long, Optional<StatisticsFile>> statisticsToSet = Maps.newHashMap(); + + public SetStatistics(TableOperations ops) { + this.ops = ops; + } + + @Override + public UpdateStatistics setStatistics(long snapshotId, StatisticsFile statisticsFile) { + Preconditions.checkArgument(snapshotId == statisticsFile.snapshotId()); + statisticsToSet.put(snapshotId, Optional.of(statisticsFile)); + return this; + } + + @Override + public UpdateStatistics removeStatistics(long snapshotId) { + statisticsToSet.put(snapshotId, Optional.empty()); + return this; + } + + @Override + public List<StatisticsFile> apply() { + return Stream.concat( + // Retained statistics + ops.current().statisticsFiles().stream() + .filter( + statisticsFile -> !statisticsToSet.containsKey(statisticsFile.snapshotId())), + // New statistics + statisticsToSet.values().stream().filter(Optional::isPresent).map(Optional::get)) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public void commit() { + TableMetadata base = ops.current(); // or ops.refresh() ? + TableMetadata.Builder builder = TableMetadata.buildFrom(base); + statisticsToSet.forEach( + (snapshotId, statistics) -> { + if (statistics.isPresent()) { + builder.setStatistics(snapshotId, statistics.get()); + } else { + builder.removeStatistics(snapshotId); + } + }); + TableMetadata newMetadata = builder.build(); + ops.commit(base, newMetadata); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java new file mode 100644 index 000000000000..f594e08d1e91 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestSetStatistics extends TableTestBase { + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] {1, 2}; + } + + public TestSetStatistics(int formatVersion) { + super(formatVersion); + } + + @Test + public void testEmptyUpdateStatistics() { + Assert.assertEquals("Table should be on version 0", 0, (int) version()); + TableMetadata base = readMetadata(); + + table.updateStatistics().commit(); + + Assert.assertSame( + "Base metadata should not change when commit is created", base, table.ops().current()); + Assert.assertEquals("Table should be on version 1", 1, (int) version()); + } + + @Test + public void testEmptyTransactionalUpdateStatistics() { + Assert.assertEquals("Table should be on version 0", 0, (int) version()); + TableMetadata base = readMetadata(); + + Transaction transaction = table.newTransaction(); + transaction.updateStatistics().commit(); + transaction.commitTransaction(); + + Assert.assertSame( + "Base metadata should not change when commit is created", base, table.ops().current()); + Assert.assertEquals("Table should be on version 0", 0, (int) version()); + } + + @Test + public void testUpdateStatistics() { + // Create a snapshot + table.newFastAppend().commit(); + Assert.assertEquals("Table should be on version 1", 1, (int) version()); + + TableMetadata base = readMetadata(); + long snapshotId = base.currentSnapshot().snapshotId(); + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, + "/some/statistics/file.puffin", + 100, + 42, + ImmutableList.of( + new GenericBlobMetadata( + "stats-type", + snapshotId, + base.lastSequenceNumber(), + ImmutableList.of(1, 2), + ImmutableMap.of("a-property", "some-property-value")))); + + table.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + + TableMetadata metadata = readMetadata(); + Assert.assertEquals("Table should be on version 2", 2, (int) version()); + Assert.assertEquals( + "Table snapshot should be the same after setting statistics file", + snapshotId, + metadata.currentSnapshot().snapshotId()); + Assert.assertEquals( + "Table metadata should have statistics files", + ImmutableList.of(statisticsFile), + metadata.statisticsFiles()); + } + + @Test + public void testRemoveStatistics() { + // Create a snapshot + table.newFastAppend().commit(); + Assert.assertEquals("Table should be on version 1", 1, (int) version()); + + TableMetadata base = readMetadata(); + long snapshotId = base.currentSnapshot().snapshotId(); + GenericStatisticsFile statisticsFile = + new GenericStatisticsFile( + snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of()); + + table.updateStatistics().setStatistics(snapshotId, statisticsFile).commit(); + + TableMetadata metadata = readMetadata(); + Assert.assertEquals("Table should be on version 2", 2, (int) version()); + Assert.assertEquals( + "Table metadata should have statistics files", + ImmutableList.of(statisticsFile), + metadata.statisticsFiles()); + + table.updateStatistics().removeStatistics(snapshotId).commit(); + + metadata = readMetadata(); + Assert.assertEquals("Table should be on version 3", 3, (int) version()); + Assert.assertEquals( + "Table metadata should have no statistics files", + ImmutableList.of(), + metadata.statisticsFiles()); + } +}