Skip to content

Commit

Permalink
Add statistics information in table snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed May 25, 2022
1 parent e883caf commit ac219c0
Show file tree
Hide file tree
Showing 27 changed files with 917 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,6 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.ReplacePartitions org.apache.iceberg.ReplacePartitions::validateNoConflictingDeletes()"
justification: "Accept all changes prior to introducing API compatibility checks"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.UpdateTableStatistics org.apache.iceberg.Transaction::newUpdateTableStatistics()"
justification: "new API method"
9 changes: 9 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,13 @@ public interface Snapshot extends Serializable {
default Integer schemaId() {
return null;
}

/**
* Return all statistics files associated with the table.
*
* @return all statistics files associated with the table.
*/
default Iterable<StatisticsFile> statisticsFiles() {
return null;
}
}
36 changes: 36 additions & 0 deletions api/src/main/java/org/apache/iceberg/StatisticsFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Set;

public interface StatisticsFile {
String location();

long fileSizeInBytes();

long fileFooterSizeInBytes();

long sequenceNumber();

Map<String, Set<List<Integer>>> statisticsFieldsSets();
}
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ default AppendFiles newFastAppend() {
*/
DeleteFiles newDelete();

/**
* Create a new {@link UpdateTableStatistics update table statistics API} to add or remove statistics
* files in this table.
*
* @return a new {@link UpdateTableStatistics}
*/
UpdateTableStatistics newUpdateTableStatistics();

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table.
*
Expand Down
44 changes: 44 additions & 0 deletions api/src/main/java/org/apache/iceberg/UpdateTableStatistics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

/**
* API for adding or removing statistics files from a table.
*/
public interface UpdateTableStatistics extends PendingUpdate<Snapshot> {
/**
* Delete a statistics file path from the underlying table.
* <p>
* To remove a statistics file from the table, this path must equal a path in the table's metadata. Paths
* that are different but equivalent will not be removed. For example, file:/path/file.avro is
* equivalent to file:///path/file.avro, but would not remove the latter path from the table.
*
* @param path a fully-qualified file path to remove from the table
* @return this for method chaining
*/
UpdateTableStatistics deleteStatisticsFile(CharSequence path);

/**
* Add a statistics file to the underlying table.
*
* @return this for method chaining
*/
UpdateTableStatistics addStatisticsFile(StatisticsFile statisticsFile);
}
18 changes: 14 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class BaseSnapshot implements Snapshot {
private final String operation;
private final Map<String, String> summary;
private final Integer schemaId;
private final List<StatisticsFile> statisticsFiles;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -61,7 +62,7 @@ class BaseSnapshot implements Snapshot {
String... manifestFiles) {
this(io, snapshotId, null, System.currentTimeMillis(), null, null,
schemaId, Lists.transform(Arrays.asList(manifestFiles),
path -> new GenericManifestFile(io.newInputFile(path), 0)));
path -> new GenericManifestFile(io.newInputFile(path), 0)), null);
}

BaseSnapshot(FileIO io,
Expand All @@ -72,7 +73,8 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList) {
String manifestList,
List<StatisticsFile> statisticsFiles) {
this.io = io;
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
Expand All @@ -82,6 +84,7 @@ class BaseSnapshot implements Snapshot {
this.summary = summary;
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
this.statisticsFiles = statisticsFiles;
}

BaseSnapshot(FileIO io,
Expand All @@ -91,8 +94,10 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
List<ManifestFile> dataManifests) {
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, schemaId, null);
List<ManifestFile> dataManifests,
List<StatisticsFile> statisticsFiles) {
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, schemaId, null,
statisticsFiles);
this.allManifests = dataManifests;
}

Expand Down Expand Up @@ -131,6 +136,11 @@ public Integer schemaId() {
return schemaId;
}

@Override
public Iterable<StatisticsFile> statisticsFiles() {
return statisticsFiles;
}

private void cacheManifests() {
if (io == null) {
throw new IllegalStateException("Cannot cache changes: FileIO is null");
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ public DeleteFiles newDelete() {
return delete;
}

@Override
public UpdateTableStatistics newUpdateTableStatistics() {
checkLastOperationCommitted("UpdateTableStatistics");
BaseUpdateTableStatistics updateStatistics = new BaseUpdateTableStatistics(transactionOps);
updates.add(updateStatistics);
return updateStatistics;
}

@Override
public ExpireSnapshots expireSnapshots() {
checkLastOperationCommitted("ExpireSnapshots");
Expand Down
117 changes: 117 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseUpdateTableStatistics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

/**
* {@link UpdateTableStatistics Update table statistics} implementation.
*/
class BaseUpdateTableStatistics extends SnapshotProducer<BaseUpdateTableStatistics> implements UpdateTableStatistics {

private final TableOperations ops;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final Set<String> removedStatisticsFiles = Sets.newHashSet();
private final Set<StatisticsFile> addedStatisticsFiles = Sets.newHashSet();

protected BaseUpdateTableStatistics(TableOperations ops) {
super(ops);
this.ops = ops;
}

@Override
protected BaseUpdateTableStatistics self() {
return this;
}

@Override
public UpdateTableStatistics deleteStatisticsFile(CharSequence path) {
Preconditions.checkNotNull(path, "path is null");
removedStatisticsFiles.add(path.toString());
return this;
}

@Override
public UpdateTableStatistics addStatisticsFile(StatisticsFile statisticsFile) {
Preconditions.checkNotNull(statisticsFile, "statisticsFile is null");
addedStatisticsFiles.add(statisticsFile);
return this;
}

@Override
public BaseUpdateTableStatistics set(String property, String value) {
summaryBuilder.set(property, value);
return this;
}

@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
// TODO should i delete files pointed to by addedStatisticsFiles?
}

@Override
protected String operation() {
return "update statistics";
}

@Override
public Snapshot apply() {
TableMetadata base = refresh();
Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
long sequenceNumber = base.nextSequenceNumber();

validate(base);

List<StatisticsFile> statisticsFiles = Lists.newArrayList();
if (base.currentSnapshot() != null && base.currentSnapshot().statisticsFiles() != null) {
base.currentSnapshot().statisticsFiles().forEach(statisticsFiles::add);
}
statisticsFiles.removeIf(statisticsFile -> removedStatisticsFiles.contains(statisticsFile.location()));
statisticsFiles.addAll(addedStatisticsFiles);

return new BaseSnapshot(ops.io(),
sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
base.currentSchemaId(), base.currentSnapshot().manifestListLocation(), statisticsFiles);
}

@Override
protected void validate(TableMetadata currentMetadata) {
ValidationException.check(currentMetadata.formatVersion() >= 2,
"Table statistics are not supported in format version %s", currentMetadata.formatVersion());
}

@Override
protected List<ManifestFile> apply(TableMetadata metadataToUpdate) {
// The method is not invoked, since apply() is overridden.
throw new UnsupportedOperationException();
}

@Override
protected Map<String, String> summary() {
return summaryBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public DeleteFiles newDelete() {
return wrapped.newDelete();
}

@Override
public UpdateTableStatistics newUpdateTableStatistics() {
return wrapped.newUpdateTableStatistics();
}

@Override
public ExpireSnapshots expireSnapshots() {
return wrapped.expireSnapshots();
Expand Down
Loading

0 comments on commit ac219c0

Please sign in to comment.