Skip to content

Commit

Permalink
Add statistics information in table snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed May 18, 2022
1 parent b2ed589 commit 7942a68
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 27 deletions.
9 changes: 9 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,13 @@ public interface Snapshot extends Serializable {
default Integer schemaId() {
return null;
}

/**
* Return all statistics files associated with the table.
*
* @return all statistics files associated with the table.
*/
default Iterable<StatisticsFile> statisticsFiles() {
return null;
}
}
36 changes: 36 additions & 0 deletions api/src/main/java/org/apache/iceberg/StatisticsFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Set;

public interface StatisticsFile {
String location();

long fileSizeInBytes();

long fileFooterSizeInBytes();

long sequenceNumber();

Map<String, Set<List<Integer>>> statistics();
}
18 changes: 14 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class BaseSnapshot implements Snapshot {
private final String operation;
private final Map<String, String> summary;
private final Integer schemaId;
private final List<StatisticsFile> statisticsFiles;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -61,7 +62,7 @@ class BaseSnapshot implements Snapshot {
String... manifestFiles) {
this(io, snapshotId, null, System.currentTimeMillis(), null, null,
schemaId, Lists.transform(Arrays.asList(manifestFiles),
path -> new GenericManifestFile(io.newInputFile(path), 0)));
path -> new GenericManifestFile(io.newInputFile(path), 0)), null);
}

BaseSnapshot(FileIO io,
Expand All @@ -72,7 +73,8 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList) {
String manifestList,
List<StatisticsFile> statisticsFiles) {
this.io = io;
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
Expand All @@ -82,6 +84,7 @@ class BaseSnapshot implements Snapshot {
this.summary = summary;
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
this.statisticsFiles = statisticsFiles;
}

BaseSnapshot(FileIO io,
Expand All @@ -91,8 +94,10 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
List<ManifestFile> dataManifests) {
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, schemaId, null);
List<ManifestFile> dataManifests,
List<StatisticsFile> statisticsFiles) {
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, schemaId, null,
statisticsFiles);
this.allManifests = dataManifests;
}

Expand Down Expand Up @@ -131,6 +136,11 @@ public Integer schemaId() {
return schemaId;
}

@Override
public Iterable<StatisticsFile> statisticsFiles() {
return statisticsFiles;
}

private void cacheManifests() {
if (io == null) {
throw new IllegalStateException("Cannot cache changes: FileIO is null");
Expand Down
78 changes: 78 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericStatisticsFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;

public class GenericStatisticsFile implements StatisticsFile {
private final String location;
private final long fileSizeInBytes;
private final long fileFooterSizeInBytes;
private final long sequenceNumber;
private final Map<String, Set<List<Integer>>> statistics;

public GenericStatisticsFile(
String location,
long fileSizeInBytes,
long fileFooterSizeInBytes,
long sequenceNumber,
Map<String, Set<List<Integer>>> statistics) {
this.location = Preconditions.checkNotNull(location, "location is null");
this.fileSizeInBytes = fileSizeInBytes;
this.fileFooterSizeInBytes = fileFooterSizeInBytes;
this.sequenceNumber = sequenceNumber;
Preconditions.checkNotNull(statistics, "statistics is null");
this.statistics = statistics.entrySet().stream()
.collect(ImmutableMap.toImmutableMap(
Map.Entry::getKey,
entry -> entry.getValue().stream().map(ImmutableList::copyOf).collect(ImmutableSet.toImmutableSet())));
}

@Override
public String location() {
return location;
}

@Override
public long fileSizeInBytes() {
return fileSizeInBytes;
}

@Override
public long fileFooterSizeInBytes() {
return fileFooterSizeInBytes;
}

@Override
public long sequenceNumber() {
return sequenceNumber;
}

@Override
public Map<String, Set<List<Integer>>> statistics() {
return statistics;
}
}
92 changes: 89 additions & 3 deletions core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.JsonUtil;

public class SnapshotParser {
Expand All @@ -47,6 +51,10 @@ private SnapshotParser() {
private static final String MANIFESTS = "manifests";
private static final String MANIFEST_LIST = "manifest-list";
private static final String SCHEMA_ID = "schema-id";
private static final String STATISTICS = "statistics";
private static final String LOCATION = "location";
private static final String FILE_SIZE_IN_BYTES = "file_size_in_bytes";
private static final String FILE_FOOTER_SIZE_IN_BYTES = "file_footer_size_in_bytes";

static void toJson(Snapshot snapshot, JsonGenerator generator)
throws IOException {
Expand Down Expand Up @@ -94,6 +102,36 @@ static void toJson(Snapshot snapshot, JsonGenerator generator)
generator.writeNumberField(SCHEMA_ID, snapshot.schemaId());
}

if (snapshot.statisticsFiles() != null) {
generator.writeArrayFieldStart(STATISTICS);
for (StatisticsFile statisticsFile : snapshot.statisticsFiles()) {
toJson(statisticsFile, generator);
}
generator.writeEndArray();
}

generator.writeEndObject();
}

private static void toJson(StatisticsFile statisticsFile, JsonGenerator generator) throws IOException {
generator.writeStartObject();
generator.writeStringField(LOCATION, statisticsFile.location());
generator.writeNumberField(FILE_SIZE_IN_BYTES, statisticsFile.fileSizeInBytes());
generator.writeNumberField(FILE_FOOTER_SIZE_IN_BYTES, statisticsFile.fileFooterSizeInBytes());
generator.writeNumberField(SEQUENCE_NUMBER, statisticsFile.sequenceNumber());
generator.writeObjectFieldStart(STATISTICS);
for (Map.Entry<String, Set<List<Integer>>> entry : statisticsFile.statistics().entrySet()) {
generator.writeArrayFieldStart(entry.getKey());
for (List<Integer> columnIds : entry.getValue()) {
generator.writeStartArray();
for (Integer columnId : columnIds) {
generator.writeNumber(columnId);
}
generator.writeEndArray();
}
generator.writeEndArray();
}
generator.writeEndObject();
generator.writeEndObject();
}

Expand Down Expand Up @@ -147,21 +185,69 @@ static Snapshot fromJson(FileIO io, JsonNode node) {

Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, node);

List<StatisticsFile> statistics = null;
if (node.has(STATISTICS)) {
JsonNode statisticsNode = node.get(STATISTICS);
Preconditions.checkArgument(statisticsNode != null && !statisticsNode.isNull() && statisticsNode.isArray(),
"Cannot parse statistics from non-array value: %s", statisticsNode);
ImmutableList.Builder<StatisticsFile> builder = ImmutableList.builderWithExpectedSize(statisticsNode.size());
statisticsNode.elements().forEachRemaining(element -> builder.add(statisticsFileFromJson(element)));
statistics = builder.build();
}

if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
return new BaseSnapshot(
io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, schemaId, manifestList);

io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, schemaId, manifestList, statistics);
} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
// loaded lazily, if it is needed
List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
location -> new GenericManifestFile(io.newInputFile(location), 0));
return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, schemaId, manifests);
return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, schemaId, manifests, statistics);
}
}

private static StatisticsFile statisticsFileFromJson(JsonNode json) {
Preconditions.checkArgument(json != null && !json.isNull() && json.isObject(),
"Cannot parse statistics from non-object value: %s", json);

String location = JsonUtil.getString(LOCATION, json);
long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE_IN_BYTES, json);
long fileFooterSizeInBytes = JsonUtil.getLong(FILE_FOOTER_SIZE_IN_BYTES, json);
long sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, json);
JsonNode statisticsNode = json.get(STATISTICS);
Preconditions.checkArgument(statisticsNode != null && !statisticsNode.isNull() && statisticsNode.isObject(),
"Cannot parse statistics from non-object value: %s", statisticsNode);
ImmutableMap.Builder<String, Set<List<Integer>>> statistics = ImmutableMap.builder();
statisticsNode.fieldNames().forEachRemaining(fieldName -> {
JsonNode columnSetsNode = statisticsNode.get(fieldName);
Preconditions.checkArgument(columnSetsNode != null && !columnSetsNode.isNull() && columnSetsNode.isArray(),
"Cannot parse column sets from non-array value: %s", columnSetsNode);
Set<List<Integer>> columnSets = Streams.stream(columnSetsNode.elements())
.map(columnIdsNode -> {
Preconditions.checkArgument(columnIdsNode != null && !columnIdsNode.isNull() && columnIdsNode.isArray(),
"Cannot parse column IDs from non-array value: %s", columnIdsNode);
return Streams.stream(columnIdsNode.elements())
.map(columnId -> {
Preconditions.checkArgument(columnId.isInt(),
"Cannot parse integer from non-int value: %s", columnId);
return columnId.asInt();
})
.collect(ImmutableList.toImmutableList());
})
.collect(ImmutableSet.toImmutableSet());
statistics.put(fieldName, columnSets);
});
return new GenericStatisticsFile(
location,
fileSizeInBytes,
fileFooterSizeInBytes,
sequenceNumber,
statistics.buildOrThrow());
}

public static Snapshot fromJson(FileIO io, String json) {
try {
return fromJson(io, JsonUtil.mapper().readValue(json, JsonNode.class));
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -170,6 +171,9 @@ public Snapshot apply() {
Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
long sequenceNumber = base.nextSequenceNumber();
List<StatisticsFile> statisticsFiles =
base.currentSnapshot() != null && base.currentSnapshot().statisticsFiles() != null ?
ImmutableList.copyOf(base.currentSnapshot().statisticsFiles()) : null;

// run validations from the child operation
validate(base);
Expand Down Expand Up @@ -201,12 +205,12 @@ public Snapshot apply() {

return new BaseSnapshot(ops.io(),
sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
base.currentSchemaId(), manifestList.location());
base.currentSchemaId(), manifestList.location(), statisticsFiles);

} else {
return new BaseSnapshot(ops.io(),
snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
base.currentSchemaId(), manifests);
base.currentSchemaId(), manifests, statisticsFiles);
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testJsonConversionWithOperation() {

Snapshot expected = new BaseSnapshot(ops.io(), id, parentId, System.currentTimeMillis(),
DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"),
3, manifests);
3, manifests, null);

String json = SnapshotParser.toJson(expected);
Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
Expand Down Expand Up @@ -122,9 +122,9 @@ public void testJsonConversionWithManifestList() throws IOException {

Snapshot expected = new BaseSnapshot(
ops.io(), id, 34, parentId, System.currentTimeMillis(),
null, null, 4, localInput(manifestList).location());
null, null, 4, localInput(manifestList).location(), null);
Snapshot inMemory = new BaseSnapshot(
ops.io(), id, parentId, expected.timestampMillis(), null, null, 4, manifests);
ops.io(), id, parentId, expected.timestampMillis(), null, null, 4, manifests, null);

Assert.assertEquals("Files should match in memory list",
inMemory.allManifests(), expected.allManifests());
Expand Down
Loading

0 comments on commit 7942a68

Please sign in to comment.