Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting table statistics #5794

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/src/main/java/org/apache/iceberg/UpdateStatistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.iceberg;

import java.util.List;

/** API for updating statistics files in a table. */
public interface UpdateStatistics extends PendingUpdate<Snapshot> {
public interface UpdateStatistics extends PendingUpdate<List<StatisticsFile>> {
/**
* Set the table's statistics file for given snapshot, replacing the previous statistics file for
* the snapshot if any exists.
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ public DeleteFiles newDelete() {
return new StreamingDelete(name, ops);
}

@Override
public UpdateStatistics updateStatistics() {
return new SetStatistics(ops);
}

@Override
public ExpireSnapshots expireSnapshots() {
return new RemoveSnapshots(ops);
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public DeleteFiles newDelete() {
return delete;
}

@Override
public UpdateStatistics updateStatistics() {
checkLastOperationCommitted("UpdateStatistics");
UpdateStatistics updateStatistics = new SetStatistics(transactionOps);
updates.add(updateStatistics);
return updateStatistics;
}

@Override
public ExpireSnapshots expireSnapshots() {
checkLastOperationCommitted("ExpireSnapshots");
Expand Down
72 changes: 72 additions & 0 deletions core/src/main/java/org/apache/iceberg/SetStatistics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class SetStatistics implements UpdateStatistics {
private final TableOperations ops;
private final Map<Long, Optional<StatisticsFile>> statisticsToSet = Maps.newHashMap();

public SetStatistics(TableOperations ops) {
this.ops = ops;
}

@Override
public UpdateStatistics setStatistics(long snapshotId, StatisticsFile statisticsFile) {
Preconditions.checkArgument(snapshotId == statisticsFile.snapshotId());
statisticsToSet.put(snapshotId, Optional.of(statisticsFile));
return this;
}

@Override
public UpdateStatistics removeStatistics(long snapshotId) {
statisticsToSet.put(snapshotId, Optional.empty());
return this;
}

@Override
public List<StatisticsFile> apply() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent for this method is to give a preview of what will be committed for validation, like a "dry run" option. We use it for that purpose in tests. That's why apply is typically called from commit and the result is used to update the table metadata.

I think it's a good idea to have the output of apply be the final list of statistics files. But that leaves a strange case where you wouldn't want to call apply from commit because the TableMetadata.Builder methods are responsible for applying changes. The logic in apply should be the same as the logic in commit, though.

I'd solve by adding a common method, internalApply that returns the TableMetadata and is called here and by commit to ensure consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

return internalApply(ops.current()).statisticsFiles();
}

@Override
public void commit() {
TableMetadata base = ops.current();
TableMetadata newMetadata = internalApply(base);
ops.commit(base, newMetadata);
}

private TableMetadata internalApply(TableMetadata base) {
TableMetadata.Builder builder = TableMetadata.buildFrom(base);
statisticsToSet.forEach(
(snapshotId, statistics) -> {
if (statistics.isPresent()) {
builder.setStatistics(snapshotId, statistics.get());
} else {
builder.removeStatistics(snapshotId);
}
});
return builder.build();
}
}
131 changes: 131 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSetStatistics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestSetStatistics extends TableTestBase {
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] {1, 2};
}

public TestSetStatistics(int formatVersion) {
super(formatVersion);
}

@Test
public void testEmptyUpdateStatistics() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();

table.updateStatistics().commit();

Assert.assertSame(
"Base metadata should not change when commit is created", base, table.ops().current());
Assert.assertEquals("Table should be on version 1", 1, (int) version());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did the table version change? It looks correct as long as the assertSame passes, but it seems strange that the table version was incremented here, but not in the transaction case below. Maybe it's a side effect of the test table operations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why. I copied the version-related assertions from TestTransaction, but the other tests don't do that. I can remove it from here.

BTW if i add the following to TestSortOrder, it passes for me locally, indicating that no-op change results in a table version bump there too.

@Test
public void testNoopUpdateBumpsVersion() {
  PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).withSpecId(5).identity("data").build();
  SortOrder order =
      SortOrder.builderFor(SCHEMA)
          .withOrderId(10)
          .asc("s.id", NULLS_LAST)
          .desc(truncate("data", 10), NULLS_FIRST)
          .build();
  TestTables.TestTable table =
      TestTables.create(tableDir, "test", SCHEMA, spec, order, formatVersion);
  Assert.assertEquals("Table should be on version 0", 0, (int) TestTables.metadataVersion("test"));

  table.replaceSortOrder().commit();

  Assert.assertEquals("Table should be on version 1", 1, (int) TestTables.metadataVersion("test"));
}

}

@Test
public void testEmptyTransactionalUpdateStatistics() {
Assert.assertEquals("Table should be on version 0", 0, (int) version());
TableMetadata base = readMetadata();

Transaction transaction = table.newTransaction();
transaction.updateStatistics().commit();
transaction.commitTransaction();

Assert.assertSame(
"Base metadata should not change when commit is created", base, table.ops().current());
Assert.assertEquals("Table should be on version 0", 0, (int) version());
}

@Test
public void testUpdateStatistics() {
// Create a snapshot
table.newFastAppend().commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());

TableMetadata base = readMetadata();
long snapshotId = base.currentSnapshot().snapshotId();
GenericStatisticsFile statisticsFile =
new GenericStatisticsFile(
snapshotId,
"/some/statistics/file.puffin",
100,
42,
ImmutableList.of(
new GenericBlobMetadata(
"stats-type",
snapshotId,
base.lastSequenceNumber(),
ImmutableList.of(1, 2),
ImmutableMap.of("a-property", "some-property-value"))));

table.updateStatistics().setStatistics(snapshotId, statisticsFile).commit();

TableMetadata metadata = readMetadata();
Assert.assertEquals("Table should be on version 2", 2, (int) version());
Assert.assertEquals(
"Table snapshot should be the same after setting statistics file",
snapshotId,
metadata.currentSnapshot().snapshotId());
Assert.assertEquals(
"Table metadata should have statistics files",
ImmutableList.of(statisticsFile),
metadata.statisticsFiles());
}

@Test
public void testRemoveStatistics() {
// Create a snapshot
table.newFastAppend().commit();
Assert.assertEquals("Table should be on version 1", 1, (int) version());

TableMetadata base = readMetadata();
long snapshotId = base.currentSnapshot().snapshotId();
GenericStatisticsFile statisticsFile =
new GenericStatisticsFile(
snapshotId, "/some/statistics/file.puffin", 100, 42, ImmutableList.of());

table.updateStatistics().setStatistics(snapshotId, statisticsFile).commit();

TableMetadata metadata = readMetadata();
Assert.assertEquals("Table should be on version 2", 2, (int) version());
Assert.assertEquals(
"Table metadata should have statistics files",
ImmutableList.of(statisticsFile),
metadata.statisticsFiles());

table.updateStatistics().removeStatistics(snapshotId).commit();

metadata = readMetadata();
Assert.assertEquals("Table should be on version 3", 3, (int) version());
Assert.assertEquals(
"Table metadata should have no statistics files",
ImmutableList.of(),
metadata.statisticsFiles());
}
}