diff --git a/api/src/main/java/org/apache/iceberg/BlobMetadata.java b/api/src/main/java/org/apache/iceberg/BlobMetadata.java new file mode 100644 index 000000000000..ee1b2ba0fbf2 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/BlobMetadata.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; + +/** A metadata about a statistics or indices blob. */ +public interface BlobMetadata { + /** Type of the blob. Never null */ + String type(); + + /** ID of the Iceberg table's snapshot the blob was computed from */ + long sourceSnapshotId(); + + /** Sequence number of the Iceberg table's snapshot the blob was computed from */ + long sourceSnapshotSequenceNumber(); + + /** Ordered list of fields the blob was calculated from. Never null */ + List fields(); + + /** Additional properties of the blob, specific to the blob type. Never null */ + Map properties(); +} diff --git a/api/src/main/java/org/apache/iceberg/StatisticsFile.java b/api/src/main/java/org/apache/iceberg/StatisticsFile.java new file mode 100644 index 000000000000..f3d74b72852a --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/StatisticsFile.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; + +/** + * Represents a statistics file in the Puffin format, that can be used to read table data more + * efficiently. + * + *

Statistics are informational. A reader can choose to ignore statistics information. Statistics + * support is not required to read the table correctly. + */ +public interface StatisticsFile { + /** ID of the Iceberg table's snapshot the statistics were computed from. */ + long snapshotId(); + + /** + * Returns fully qualified path to the file, suitable for constructing a Hadoop Path. Never null. + */ + String path(); + + /** Size of the file */ + long fileSizeInBytes(); + + /** Size of the Puffin footer. */ + long fileFooterSizeInBytes(); + + /** List of statistics contained in the file. Never null. */ + List blobMetadata(); +} diff --git a/core/src/main/java/org/apache/iceberg/GenericBlobMetadata.java b/core/src/main/java/org/apache/iceberg/GenericBlobMetadata.java new file mode 100644 index 000000000000..872a6ca806e8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/GenericBlobMetadata.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.StringJoiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class GenericBlobMetadata implements BlobMetadata { + private final String type; + private final long sourceSnapshotId; + private final long sourceSnapshotSequenceNumber; + private final List fields; + private final Map properties; + + public GenericBlobMetadata( + String type, + long sourceSnapshotId, + long sourceSnapshotSequenceNumber, + List fields, + Map properties) { + Preconditions.checkNotNull(type, "type is null"); + Preconditions.checkNotNull(fields, "fields is null"); + Preconditions.checkNotNull(properties, "properties is null"); + this.type = type; + this.sourceSnapshotId = sourceSnapshotId; + this.sourceSnapshotSequenceNumber = sourceSnapshotSequenceNumber; + this.fields = ImmutableList.copyOf(fields); + this.properties = ImmutableMap.copyOf(properties); + } + + @Override + public String type() { + return type; + } + + @Override + public long sourceSnapshotId() { + return sourceSnapshotId; + } + + @Override + public long sourceSnapshotSequenceNumber() { + return sourceSnapshotSequenceNumber; + } + + @Override + public List fields() { + return fields; + } + + @Override + public Map properties() { + return properties; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GenericBlobMetadata that = (GenericBlobMetadata) o; + return sourceSnapshotId == that.sourceSnapshotId + && sourceSnapshotSequenceNumber == that.sourceSnapshotSequenceNumber + && Objects.equals(type, that.type) + && Objects.equals(fields, that.fields) + && Objects.equals(properties, that.properties); + } + + @Override + public int hashCode() { + return Objects.hash(type, sourceSnapshotId, sourceSnapshotSequenceNumber, fields, properties); + } + + @Override + public String toString() { + return new StringJoiner(", ", GenericBlobMetadata.class.getSimpleName() + "[", "]") + .add("type='" + type + "'") + .add("sourceSnapshotId=" + sourceSnapshotId) + .add("sourceSnapshotSequenceNumber=" + sourceSnapshotSequenceNumber) + .add("fields=" + fields) + .add("properties=" + properties) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/GenericStatisticsFile.java b/core/src/main/java/org/apache/iceberg/GenericStatisticsFile.java new file mode 100644 index 000000000000..2f00138235f3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/GenericStatisticsFile.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; + +public class GenericStatisticsFile implements StatisticsFile { + private final long snapshotId; + private final String path; + private final long fileSizeInBytes; + private final long fileFooterSizeInBytes; + private final List blobMetadata; + + public GenericStatisticsFile( + long snapshotId, + String path, + long fileSizeInBytes, + long fileFooterSizeInBytes, + List blobMetadata) { + Preconditions.checkNotNull(path, "path is null"); + Preconditions.checkNotNull(blobMetadata, "blobMetadata is null"); + this.snapshotId = snapshotId; + this.path = path; + this.fileSizeInBytes = fileSizeInBytes; + this.fileFooterSizeInBytes = fileFooterSizeInBytes; + this.blobMetadata = ImmutableList.copyOf(blobMetadata); + } + + @Override + public long snapshotId() { + return snapshotId; + } + + @Override + public String path() { + return path; + } + + @Override + public long fileSizeInBytes() { + return fileSizeInBytes; + } + + @Override + public long fileFooterSizeInBytes() { + return fileFooterSizeInBytes; + } + + @Override + public List blobMetadata() { + return blobMetadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GenericStatisticsFile that = (GenericStatisticsFile) o; + return snapshotId == that.snapshotId + && fileSizeInBytes == that.fileSizeInBytes + && fileFooterSizeInBytes == that.fileFooterSizeInBytes + && Objects.equals(path, that.path) + && Objects.equals(blobMetadata, that.blobMetadata); + } + + @Override + public int hashCode() { + return Objects.hash(snapshotId, path, fileSizeInBytes, fileFooterSizeInBytes, blobMetadata); + } + + @Override + public String toString() { + return new StringJoiner(", ", GenericStatisticsFile.class.getSimpleName() + "[", "]") + .add("snapshotId=" + snapshotId) + .add("path='" + path + "'") + .add("fileSizeInBytes=" + fileSizeInBytes) + .add("fileFooterSizeInBytes=" + fileFooterSizeInBytes) + .add("blobMetadata=" + blobMetadata) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 1bb467840be0..9159e0f58c3c 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -178,6 +178,46 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } } + class SetStatistics implements MetadataUpdate { + private final long snapshotId; + private final StatisticsFile statisticsFile; + + public SetStatistics(long snapshotId, StatisticsFile statisticsFile) { + this.snapshotId = snapshotId; + this.statisticsFile = statisticsFile; + } + + public long snapshotId() { + return snapshotId; + } + + public StatisticsFile statisticsFile() { + return statisticsFile; + } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.setStatistics(snapshotId, statisticsFile); + } + } + + class RemoveStatistics implements MetadataUpdate { + private final long snapshotId; + + public RemoveStatistics(long snapshotId) { + this.snapshotId = snapshotId; + } + + public long snapshotId() { + return snapshotId; + } + + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.removeStatistics(snapshotId); + } + } + class AddSnapshot implements MetadataUpdate { private final Snapshot snapshot; diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index 52b6ebfb5fd8..45c1e04fb49c 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -54,6 +54,8 @@ private MetadataUpdateParser() {} static final String SET_PROPERTIES = "set-properties"; static final String REMOVE_PROPERTIES = "remove-properties"; static final String SET_LOCATION = "set-location"; + static final String SET_STATISTICS = "set-statistics"; + static final String REMOVE_STATISTICS = "remove-statistics"; // AssignUUID private static final String UUID = "uuid"; @@ -80,6 +82,9 @@ private MetadataUpdateParser() {} // SetDefaultSortOrder private static final String SORT_ORDER_ID = "sort-order-id"; + // SetStatistics + private static final String STATISTICS = "statistics"; + // AddSnapshot private static final String SNAPSHOT = "snapshot"; @@ -113,6 +118,8 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.SetDefaultPartitionSpec.class, SET_DEFAULT_PARTITION_SPEC) .put(MetadataUpdate.AddSortOrder.class, ADD_SORT_ORDER) .put(MetadataUpdate.SetDefaultSortOrder.class, SET_DEFAULT_SORT_ORDER) + .put(MetadataUpdate.SetStatistics.class, SET_STATISTICS) + .put(MetadataUpdate.RemoveStatistics.class, REMOVE_STATISTICS) .put(MetadataUpdate.AddSnapshot.class, ADD_SNAPSHOT) .put(MetadataUpdate.RemoveSnapshot.class, REMOVE_SNAPSHOTS) .put(MetadataUpdate.RemoveSnapshotRef.class, REMOVE_SNAPSHOT_REF) @@ -182,6 +189,12 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator case SET_DEFAULT_SORT_ORDER: writeSetDefaultSortOrder((MetadataUpdate.SetDefaultSortOrder) metadataUpdate, generator); break; + case SET_STATISTICS: + writeSetStatistics((MetadataUpdate.SetStatistics) metadataUpdate, generator); + break; + case REMOVE_STATISTICS: + writeRemoveStatistics((MetadataUpdate.RemoveStatistics) metadataUpdate, generator); + break; case ADD_SNAPSHOT: writeAddSnapshot((MetadataUpdate.AddSnapshot) metadataUpdate, generator); break; @@ -252,6 +265,10 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readAddSortOrder(jsonNode); case SET_DEFAULT_SORT_ORDER: return readSetDefaultSortOrder(jsonNode); + case SET_STATISTICS: + return readSetStatistics(jsonNode); + case REMOVE_STATISTICS: + return readRemoveStatistics(jsonNode); case ADD_SNAPSHOT: return readAddSnapshot(jsonNode); case REMOVE_SNAPSHOTS: @@ -316,6 +333,18 @@ private static void writeSetDefaultSortOrder( gen.writeNumberField(SORT_ORDER_ID, update.sortOrderId()); } + private static void writeSetStatistics(MetadataUpdate.SetStatistics update, JsonGenerator gen) + throws IOException { + gen.writeNumberField(SNAPSHOT_ID, update.snapshotId()); + gen.writeFieldName(STATISTICS); + StatisticsFileParser.toJson(update.statisticsFile(), gen); + } + + private static void writeRemoveStatistics( + MetadataUpdate.RemoveStatistics update, JsonGenerator gen) throws IOException { + gen.writeNumberField(SNAPSHOT_ID, update.snapshotId()); + } + private static void writeAddSnapshot(MetadataUpdate.AddSnapshot update, JsonGenerator gen) throws IOException { gen.writeFieldName(SNAPSHOT); @@ -415,6 +444,18 @@ private static MetadataUpdate readSetDefaultSortOrder(JsonNode node) { return new MetadataUpdate.SetDefaultSortOrder(sortOrderId); } + private static MetadataUpdate readSetStatistics(JsonNode node) { + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + JsonNode statisticsFileNode = JsonUtil.get(STATISTICS, node); + StatisticsFile statisticsFile = StatisticsFileParser.fromJson(statisticsFileNode); + return new MetadataUpdate.SetStatistics(snapshotId, statisticsFile); + } + + private static MetadataUpdate readRemoveStatistics(JsonNode node) { + int snapshotId = JsonUtil.getInt(SNAPSHOT_ID, node); + return new MetadataUpdate.RemoveStatistics(snapshotId); + } + private static MetadataUpdate readAddSnapshot(JsonNode node) { Snapshot snapshot = SnapshotParser.fromJson(null, JsonUtil.get(SNAPSHOT, node)); return new MetadataUpdate.AddSnapshot(snapshot); diff --git a/core/src/main/java/org/apache/iceberg/StatisticsFileParser.java b/core/src/main/java/org/apache/iceberg/StatisticsFileParser.java new file mode 100644 index 000000000000..3f86a6767e2e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/StatisticsFileParser.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.JsonUtil; + +public class StatisticsFileParser { + + private static final String SNAPSHOT_ID = "snapshot-id"; + private static final String STATISTICS_PATH = "statistics-path"; + private static final String FILE_SIZE_IN_BYTES = "file-size-in-bytes"; + private static final String FILE_FOOTER_SIZE_IN_BYTES = "file-footer-size-in-bytes"; + private static final String BLOB_METADATA = "blob-metadata"; + private static final String TYPE = "type"; + private static final String SEQUENCE_NUMBER = "sequence-number"; + private static final String FIELDS = "fields"; + private static final String PROPERTIES = "properties"; + + private StatisticsFileParser() {} + + public static String toJson(StatisticsFile statisticsFile) { + return toJson(statisticsFile, false); + } + + public static String toJson(StatisticsFile statisticsFile, boolean pretty) { + try { + StringWriter writer = new StringWriter(); + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + if (pretty) { + generator.useDefaultPrettyPrinter(); + } + + toJson(statisticsFile, generator); + generator.flush(); + return writer.toString(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static void toJson(StatisticsFile statisticsFile, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + generator.writeNumberField(SNAPSHOT_ID, statisticsFile.snapshotId()); + generator.writeStringField(STATISTICS_PATH, statisticsFile.path()); + generator.writeNumberField(FILE_SIZE_IN_BYTES, statisticsFile.fileSizeInBytes()); + generator.writeNumberField(FILE_FOOTER_SIZE_IN_BYTES, statisticsFile.fileFooterSizeInBytes()); + generator.writeArrayFieldStart(BLOB_METADATA); + for (BlobMetadata blobMetadata : statisticsFile.blobMetadata()) { + toJson(blobMetadata, generator); + } + generator.writeEndArray(); + generator.writeEndObject(); + } + + static StatisticsFile fromJson(JsonNode node) { + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + String path = JsonUtil.getString(STATISTICS_PATH, node); + long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE_IN_BYTES, node); + long fileFooterSizeInBytes = JsonUtil.getLong(FILE_FOOTER_SIZE_IN_BYTES, node); + ImmutableList.Builder blobMetadata = ImmutableList.builder(); + JsonNode blobsJson = node.get(BLOB_METADATA); + Preconditions.checkArgument( + blobsJson != null && blobsJson.isArray(), + "Cannot parse blob metadata from non-array: %s", + blobsJson); + for (JsonNode blobJson : blobsJson) { + blobMetadata.add(blobMetadataFromJson(blobJson)); + } + return new GenericStatisticsFile( + snapshotId, path, fileSizeInBytes, fileFooterSizeInBytes, blobMetadata.build()); + } + + private static void toJson(BlobMetadata blobMetadata, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + generator.writeStringField(TYPE, blobMetadata.type()); + generator.writeNumberField(SNAPSHOT_ID, blobMetadata.sourceSnapshotId()); + generator.writeNumberField(SEQUENCE_NUMBER, blobMetadata.sourceSnapshotSequenceNumber()); + generator.writeArrayFieldStart(FIELDS); + for (int field : blobMetadata.fields()) { + generator.writeNumber(field); + } + generator.writeEndArray(); + + if (!blobMetadata.properties().isEmpty()) { + generator.writeObjectFieldStart(PROPERTIES); + for (Map.Entry entry : blobMetadata.properties().entrySet()) { + generator.writeStringField(entry.getKey(), entry.getValue()); + } + generator.writeEndObject(); + } + generator.writeEndObject(); + } + + private static BlobMetadata blobMetadataFromJson(JsonNode node) { + String type = JsonUtil.getString(TYPE, node); + long sourceSnapshotId = JsonUtil.getLong(SNAPSHOT_ID, node); + long sourceSnapshotSequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, node); + List fields = JsonUtil.getIntegerList(FIELDS, node); + Map properties; + if (node.has(PROPERTIES)) { + properties = JsonUtil.getStringMap(PROPERTIES, node); + } else { + properties = ImmutableMap.of(); + } + return new GenericBlobMetadata( + type, sourceSnapshotId, sourceSnapshotSequenceNumber, fields, properties); + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 0266e83bd553..57480a2811de 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -243,6 +243,7 @@ public String toString() { private final List snapshotLog; private final List previousFiles; private final Map refs; + private final List statisticsFiles; private final List changes; @SuppressWarnings("checkstyle:CyclomaticComplexity") @@ -267,6 +268,7 @@ public String toString() { List snapshotLog, List previousFiles, Map refs, + List statisticsFiles, List changes) { Preconditions.checkArgument( specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); @@ -314,6 +316,7 @@ public String toString() { this.specsById = indexSpecs(specs); this.sortOrdersById = indexSortOrders(sortOrders); this.refs = validateRefs(currentSnapshotId, refs, snapshotsById); + this.statisticsFiles = ImmutableList.copyOf(statisticsFiles); HistoryEntry last = null; for (HistoryEntry logEntry : snapshotLog) { @@ -489,6 +492,10 @@ public Map refs() { return refs; } + public List statisticsFiles() { + return statisticsFiles; + } + public List snapshotLog() { return snapshotLog; } @@ -817,6 +824,7 @@ public static class Builder { private long currentSnapshotId; private List snapshots; private final Map refs; + private final Map> statisticsFiles; // change tracking private final List changes; @@ -853,6 +861,7 @@ private Builder() { this.snapshotLog = Lists.newArrayList(); this.previousFiles = Lists.newArrayList(); this.refs = Maps.newHashMap(); + this.statisticsFiles = Maps.newHashMap(); this.snapshotsById = Maps.newHashMap(); this.schemasById = Maps.newHashMap(); this.specsById = Maps.newHashMap(); @@ -884,6 +893,8 @@ private Builder(TableMetadata base) { this.previousFileLocation = base.metadataFileLocation; this.previousFiles = base.previousFiles; this.refs = Maps.newHashMap(base.refs); + this.statisticsFiles = + base.statisticsFiles.stream().collect(Collectors.groupingBy(StatisticsFile::snapshotId)); this.snapshotsById = Maps.newHashMap(base.snapshotsById); this.schemasById = Maps.newHashMap(base.schemasById); @@ -1176,6 +1187,27 @@ public Builder removeBranch(String branch) { return this; } + public Builder setStatistics(long snapshotId, StatisticsFile statisticsFile) { + Preconditions.checkNotNull(statisticsFile, "statisticsFile is null"); + Preconditions.checkArgument( + snapshotId == statisticsFile.snapshotId(), + "snapshotId does not match: %s vs %s", + snapshotId, + statisticsFile.snapshotId()); + statisticsFiles.put(statisticsFile.snapshotId(), ImmutableList.of(statisticsFile)); + changes.add(new MetadataUpdate.SetStatistics(snapshotId, statisticsFile)); + return this; + } + + public Builder removeStatistics(long snapshotId) { + Preconditions.checkNotNull(snapshotId, "snapshotId is null"); + if (statisticsFiles.remove(snapshotId) == null) { + return this; + } + changes.add(new MetadataUpdate.RemoveStatistics(snapshotId)); + return this; + } + public Builder removeSnapshots(List snapshotsToRemove) { Set idsToRemove = snapshotsToRemove.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); @@ -1313,6 +1345,7 @@ public TableMetadata build() { ImmutableList.copyOf(newSnapshotLog), ImmutableList.copyOf(metadataHistory), ImmutableMap.copyOf(refs), + statisticsFiles.values().stream().flatMap(List::stream).collect(Collectors.toList()), discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes)); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index 6be1df358099..a2c5bcdf243c 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -105,6 +105,7 @@ private TableMetadataParser() {} static final String SNAPSHOT_LOG = "snapshot-log"; static final String METADATA_FILE = "metadata-file"; static final String METADATA_LOG = "metadata-log"; + static final String STATISTICS = "statistics"; public static void overwrite(TableMetadata metadata, OutputFile outputFile) { internalWrite(metadata, outputFile, true); @@ -453,6 +454,13 @@ static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node) snapshots.add(SnapshotParser.fromJson(io, iterator.next())); } + List statisticsFiles; + if (node.has(STATISTICS)) { + statisticsFiles = statisticsFilesFromJson(node.get(STATISTICS)); + } else { + statisticsFiles = ImmutableList.of(); + } + ImmutableList.Builder entries = ImmutableList.builder(); if (node.has(SNAPSHOT_LOG)) { Iterator logIterator = node.get(SNAPSHOT_LOG).elements(); @@ -498,6 +506,7 @@ static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node) entries.build(), metadataEntries.build(), refs, + statisticsFiles, ImmutableList.of() /* no changes from the file */); } @@ -517,4 +526,18 @@ private static Map refsFromJson(JsonNode refMap) { return refsBuilder.build(); } + + private static List statisticsFilesFromJson(JsonNode statisticsFilesList) { + Preconditions.checkArgument( + statisticsFilesList.isArray(), + "Cannot parse statistics files from non-array: %s", + statisticsFilesList); + + ImmutableList.Builder statisticsFilesBuilder = ImmutableList.builder(); + for (JsonNode statisticsFile : statisticsFilesList) { + statisticsFilesBuilder.add(StatisticsFileParser.fromJson(statisticsFile)); + } + + return statisticsFilesBuilder.build(); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index f215b04a8af3..ac78b38f182a 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -29,7 +29,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; @@ -738,6 +740,49 @@ public void testSetLocationToJson() { "Remove properties should serialize to the correct JSON value", expected, actual); } + @Test + public void testSetStatistics() { + String json = + "{\"action\":\"set-statistics\",\"snapshot-id\":42,\"statistics\":{\"snapshot-id\":42," + + "\"statistics-path\":\"s3://bucket/warehouse/stats.puffin\",\"file-size-in-bytes\":124," + + "\"file-footer-size-in-bytes\":27,\"blob-metadata\":[{\"type\":\"boring-type\"," + + "\"snapshot-id\":42,\"sequence-number\":2,\"fields\":[1]," + + "\"properties\":{\"prop-key\":\"prop-value\"}}]}}"; + MetadataUpdate expected = + new MetadataUpdate.SetStatistics( + 42, + new GenericStatisticsFile( + 42, + "s3://bucket/warehouse/stats.puffin", + 124, + 27, + ImmutableList.of( + new GenericBlobMetadata( + "boring-type", + 42, + 2, + ImmutableList.of(1), + ImmutableMap.of("prop-key", "prop-value"))))); + assertEquals( + MetadataUpdateParser.SET_STATISTICS, expected, MetadataUpdateParser.fromJson(json)); + Assert.assertEquals( + "Set statistics should convert to the correct JSON value", + json, + MetadataUpdateParser.toJson(expected)); + } + + @Test + public void testRemoveStatistics() { + String json = "{\"action\":\"remove-statistics\",\"snapshot-id\":42}"; + MetadataUpdate expected = new MetadataUpdate.RemoveStatistics(42); + assertEquals( + MetadataUpdateParser.REMOVE_STATISTICS, expected, MetadataUpdateParser.fromJson(json)); + Assert.assertEquals( + "Remove statistics should convert to the correct JSON value", + json, + MetadataUpdateParser.toJson(expected)); + } + public void assertEquals( String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) { switch (action) { @@ -779,6 +824,16 @@ public void assertEquals( (MetadataUpdate.SetDefaultSortOrder) expectedUpdate, (MetadataUpdate.SetDefaultSortOrder) actualUpdate); break; + case MetadataUpdateParser.SET_STATISTICS: + assertEqualsSetStatistics( + (MetadataUpdate.SetStatistics) expectedUpdate, + (MetadataUpdate.SetStatistics) actualUpdate); + break; + case MetadataUpdateParser.REMOVE_STATISTICS: + assertEqualsRemoveStatistics( + (MetadataUpdate.RemoveStatistics) expectedUpdate, + (MetadataUpdate.RemoveStatistics) actualUpdate); + break; case MetadataUpdateParser.ADD_SNAPSHOT: assertEqualsAddSnapshot( (MetadataUpdate.AddSnapshot) expectedUpdate, (MetadataUpdate.AddSnapshot) actualUpdate); @@ -908,6 +963,66 @@ private static void assertEqualsSetDefaultSortOrder( "Sort order id should be the same", expected.sortOrderId(), actual.sortOrderId()); } + private static void assertEqualsSetStatistics( + MetadataUpdate.SetStatistics expected, MetadataUpdate.SetStatistics actual) { + Assert.assertEquals("Snapshot IDs should be equal", expected.snapshotId(), actual.snapshotId()); + Assert.assertEquals( + "Statistics files snapshot IDs should be equal", + expected.statisticsFile().snapshotId(), + actual.statisticsFile().snapshotId()); + Assert.assertEquals( + "Statistics files paths should be equal", + expected.statisticsFile().path(), + actual.statisticsFile().path()); + Assert.assertEquals( + "Statistics files size should be equal", + expected.statisticsFile().fileSizeInBytes(), + actual.statisticsFile().fileSizeInBytes()); + Assert.assertEquals( + "Statistics files footer size should be equal", + expected.statisticsFile().fileFooterSizeInBytes(), + actual.statisticsFile().fileFooterSizeInBytes()); + Assert.assertEquals( + "Statistics blob list size should be equal", + expected.statisticsFile().blobMetadata().size(), + actual.statisticsFile().blobMetadata().size()); + + Streams.zip( + expected.statisticsFile().blobMetadata().stream(), + actual.statisticsFile().blobMetadata().stream(), + Pair::of) + .forEachOrdered( + pair -> { + BlobMetadata expectedBlob = pair.first(); + BlobMetadata actualBlob = pair.second(); + + Assert.assertEquals( + "Expected blob type should be equal", expectedBlob.type(), actualBlob.type()); + Assert.assertEquals( + "Expected blob fields should be equal", + expectedBlob.fields(), + actualBlob.fields()); + Assert.assertEquals( + "Expected blob source snapshot ID should be equal", + expectedBlob.sourceSnapshotId(), + actualBlob.sourceSnapshotId()); + Assert.assertEquals( + "Expected blob source snapshot sequence number should be equal", + expectedBlob.sourceSnapshotSequenceNumber(), + actualBlob.sourceSnapshotSequenceNumber()); + Assert.assertEquals( + "Expected blob properties should be equal", + expectedBlob.properties(), + actualBlob.properties()); + }); + } + + private static void assertEqualsRemoveStatistics( + MetadataUpdate.RemoveStatistics expected, MetadataUpdate.RemoveStatistics actual) { + Assert.assertEquals( + "Snapshots to remove should be the same", expected.snapshotId(), actual.snapshotId()); + } + private static void assertEqualsAddSnapshot( MetadataUpdate.AddSnapshot expected, MetadataUpdate.AddSnapshot actual) { Assert.assertEquals( diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index e3c69f82993a..e6c7109f6a69 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -50,10 +50,12 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.JsonUtil; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -152,6 +154,7 @@ public void testJsonConversion() throws Exception { snapshotLog, ImmutableList.of(), refs, + ImmutableList.of(), ImmutableList.of()); String asJson = TableMetadataParser.toJson(expected); @@ -272,6 +275,7 @@ public void testBackwardCompat() throws Exception { ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), + ImmutableList.of(), ImmutableList.of()); String asJson = toJsonWithoutSpecAndSchemaList(expected); @@ -415,6 +419,7 @@ public void testInvalidMainBranch() { snapshotLog, ImmutableList.of(), refs, + ImmutableList.of(), ImmutableList.of())); } @@ -464,6 +469,7 @@ public void testMainWithoutCurrent() { ImmutableList.of(), ImmutableList.of(), refs, + ImmutableList.of(), ImmutableList.of())); } @@ -502,6 +508,7 @@ public void testBranchSnapshotMissing() { ImmutableList.of(), ImmutableList.of(), refs, + ImmutableList.of(), ImmutableList.of())); } @@ -607,6 +614,7 @@ public void testJsonWithPreviousMetadataLog() throws Exception { reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog), ImmutableMap.of(), + ImmutableList.of(), ImmutableList.of()); String asJson = TableMetadataParser.toJson(base); @@ -686,6 +694,7 @@ public void testAddPreviousMetadataRemoveNone() { reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog), ImmutableMap.of(), + ImmutableList.of(), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -783,6 +792,7 @@ public void testAddPreviousMetadataRemoveOne() { reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog), ImmutableMap.of(), + ImmutableList.of(), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -886,6 +896,7 @@ public void testAddPreviousMetadataRemoveMultiple() { reversedSnapshotLog, ImmutableList.copyOf(previousMetadataLog), ImmutableMap.of(), + ImmutableList.of(), ImmutableList.of()); previousMetadataLog.add(latestPreviousMetadata); @@ -935,6 +946,7 @@ public void testV2UUIDValidation() { ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), + ImmutableList.of(), ImmutableList.of())); } @@ -967,6 +979,7 @@ public void testVersionValidation() { ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), + ImmutableList.of(), ImmutableList.of())); } @@ -1235,6 +1248,89 @@ public void testUpdateSortOrder() { sortedByX.sortOrder().fields().get(0).nullOrder()); } + @Test + public void testStatistics() { + Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get())); + + TableMetadata meta = + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of()); + Assert.assertEquals( + "Should default to no statistics files", ImmutableList.of(), meta.statisticsFiles()); + } + + @Test + public void testSetStatistics() { + Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get())); + + TableMetadata meta = + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of()); + + TableMetadata withStatistics = + TableMetadata.buildFrom(meta) + .setStatistics( + 43, + new GenericStatisticsFile( + 43, "/some/path/to/stats/file", 128, 27, ImmutableList.of())) + .build(); + + Assertions.assertThat(withStatistics.statisticsFiles()) + .as("There should be one statistics file registered") + .hasSize(1); + StatisticsFile statisticsFile = Iterables.getOnlyElement(withStatistics.statisticsFiles()); + Assert.assertEquals("Statistics file snapshot", 43L, statisticsFile.snapshotId()); + Assert.assertEquals("Statistics file path", "/some/path/to/stats/file", statisticsFile.path()); + + TableMetadata withStatisticsReplaced = + TableMetadata.buildFrom(withStatistics) + .setStatistics( + 43, + new GenericStatisticsFile( + 43, "/some/path/to/stats/file2", 128, 27, ImmutableList.of())) + .build(); + + Assertions.assertThat(withStatisticsReplaced.statisticsFiles()) + .as("There should be one statistics file registered") + .hasSize(1); + statisticsFile = Iterables.getOnlyElement(withStatisticsReplaced.statisticsFiles()); + Assert.assertEquals("Statistics file snapshot", 43L, statisticsFile.snapshotId()); + Assert.assertEquals("Statistics file path", "/some/path/to/stats/file2", statisticsFile.path()); + } + + @Test + public void testRemoveStatistics() { + Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get())); + + TableMetadata meta = + TableMetadata.buildFrom( + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of())) + .setStatistics( + 43, + new GenericStatisticsFile( + 43, "/some/path/to/stats/file", 128, 27, ImmutableList.of())) + .setStatistics( + 44, + new GenericStatisticsFile( + 44, "/some/path/to/stats/file2", 128, 27, ImmutableList.of())) + .build(); + + Assert.assertSame( + "Should detect no statistics to remove", + meta, + TableMetadata.buildFrom(meta).removeStatistics(42L).build()); + + TableMetadata withOneRemoved = TableMetadata.buildFrom(meta).removeStatistics(43).build(); + + Assertions.assertThat(withOneRemoved.statisticsFiles()) + .as("There should be one statistics file retained") + .hasSize(1); + StatisticsFile statisticsFile = Iterables.getOnlyElement(withOneRemoved.statisticsFiles()); + Assert.assertEquals("Statistics file snapshot", 44L, statisticsFile.snapshotId()); + Assert.assertEquals("Statistics file path", "/some/path/to/stats/file2", statisticsFile.path()); + } + @Test public void testParseSchemaIdentifierFields() throws Exception { String data = readTableMetadataInputFile("TableMetadataV2Valid.json"); @@ -1425,6 +1521,24 @@ public void testUpgradeV1MetadataToV2ThroughTableProperty() { meta.properties()); } + @Test + public void testParseStatisticsFiles() throws Exception { + String data = readTableMetadataInputFile("TableMetadataStatisticsFiles.json"); + TableMetadata parsed = TableMetadataParser.fromJson(ops.io(), data); + Assertions.assertThat(parsed.statisticsFiles()).as("parsed statistics files").hasSize(1); + Assert.assertEquals( + "parsed statistics file", + new GenericStatisticsFile( + 3055729675574597004L, + "s3://a/b/stats.puffin", + 413, + 42, + ImmutableList.of( + new GenericBlobMetadata( + "ndv", 3055729675574597004L, 1, ImmutableList.of(1), ImmutableMap.of()))), + Iterables.getOnlyElement(parsed.statisticsFiles())); + } + @Test public void testNoReservedPropertyForTableMetadataCreation() { Schema schema = new Schema(Types.NestedField.required(10, "x", Types.StringType.get())); diff --git a/core/src/test/resources/TableMetadataStatisticsFiles.json b/core/src/test/resources/TableMetadataStatisticsFiles.json new file mode 100644 index 000000000000..150e2891efc0 --- /dev/null +++ b/core/src/test/resources/TableMetadataStatisticsFiles.json @@ -0,0 +1,70 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "properties": {}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3055729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 0 + } + ], + "statistics": [ + { + "snapshot-id": 3055729675574597004, + "statistics-path": "s3://a/b/stats.puffin", + "file-size-in-bytes": 413, + "file-footer-size-in-bytes": 42, + "blob-metadata": [ + { + "type": "ndv", + "snapshot-id": 3055729675574597004, + "sequence-number": 1, + "fields": [1] + } + ] + } + ], + "snapshot-log": [], + "metadata-log": [] +} \ No newline at end of file