Skip to content

Commit

Permalink
Fix Iceberg statistics accumulation after schema evolution
Browse files Browse the repository at this point in the history
  • Loading branch information
BlueStalker authored and findepi committed Feb 25, 2022
1 parent 852f8c7 commit 0c346b5
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -114,7 +112,6 @@ public Map<Integer, Long> getColumnSizes()

public static class Builder
{
private final Map<Integer, Type.PrimitiveType> idToTypeMapping;
private final List<Types.NestedField> columns;
private final TypeManager typeManager;
private final Map<Integer, Optional<Long>> nullCounts = new HashMap<>();
Expand All @@ -127,11 +124,9 @@ public static class Builder
private long size;

public Builder(
Map<Integer, Type.PrimitiveType> idToTypeMapping,
List<Types.NestedField> columns,
TypeManager typeManager)
{
this.idToTypeMapping = ImmutableMap.copyOf(requireNonNull(idToTypeMapping, "idToTypeMapping is null"));
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.typeManager = requireNonNull(typeManager, "typeManager is null");

Expand Down Expand Up @@ -161,8 +156,6 @@ public void acceptDataFile(DataFile dataFile, PartitionSpec partitionSpec)
.map(PartitionField::sourceId)
.collect(toImmutableSet());
Map<Integer, Optional<String>> partitionValues = getPartitionKeys(dataFile.partition(), partitionSpec);
Map<Integer, Object> lowerBounds = convertBounds(dataFile.lowerBounds());
Map<Integer, Object> upperBounds = convertBounds(dataFile.upperBounds());
for (Types.NestedField column : partitionSpec.schema().columns()) {
int id = column.fieldId();
io.trino.spi.type.Type trinoType = fieldIdToTrinoType.get(id);
Expand All @@ -187,8 +180,10 @@ public void acceptDataFile(DataFile dataFile, PartitionSpec partitionSpec)
}
}
else {
Object lowerBound = convertIcebergValueToTrino(column.type(), lowerBounds.getOrDefault(id, null));
Object upperBound = convertIcebergValueToTrino(column.type(), upperBounds.getOrDefault(id, null));
Object lowerBound = convertIcebergValueToTrino(column.type(),
Conversions.fromByteBuffer(column.type(), Optional.ofNullable(dataFile.lowerBounds()).map(a -> a.get(id)).orElse(null)));
Object upperBound = convertIcebergValueToTrino(column.type(),
Conversions.fromByteBuffer(column.type(), Optional.ofNullable(dataFile.upperBounds()).map(a -> a.get(id)).orElse(null)));
Optional<Long> nullCount = Optional.ofNullable(dataFile.nullValueCounts().get(id));
updateMinMaxStats(
id,
Expand Down Expand Up @@ -252,28 +247,6 @@ private void updateMinMaxStats(
}).updateMinMax(lowerBound, upperBound);
}
}

/**
* Converts a file's column bounds to a Map from field id to Iceberg Object representation
* @param idToMetricMap A Map from field id to Iceberg ByteBuffer representation
* @return A Map from field id to Iceberg Object representation
*/
private Map<Integer, Object> convertBounds(@Nullable Map<Integer, ByteBuffer> idToMetricMap)
{
if (idToMetricMap == null) {
return ImmutableMap.of();
}
ImmutableMap.Builder<Integer, Object> map = ImmutableMap.builder();
idToMetricMap.forEach((id, value) -> {
Type.PrimitiveType type = idToTypeMapping.get(id);
verify(type != null, "No type for column id %s, known types: %s", id, idToTypeMapping);
Object icebergRepresentation = Conversions.fromByteBuffer(type, value);
if (icebergRepresentation != null) {
map.put(id, icebergRepresentation);
}
});
return map.buildOrThrow();
}
}

private static class ColumnStatistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private Map<StructLikeWrapper, IcebergStatistics> getStatisticsByPartition(Table

partitions.computeIfAbsent(
partitionWrapper,
ignored -> new IcebergStatistics.Builder(idToTypeMapping, icebergTable.schema().columns(), typeManager))
ignored -> new IcebergStatistics.Builder(icebergTable.schema().columns(), typeManager))
.acceptDataFile(dataFile, fileScanTask.spec());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons
.useSnapshot(tableHandle.getSnapshotId().get())
.includeColumnStats();

IcebergStatistics.Builder icebergStatisticsBuilder = new IcebergStatistics.Builder(idToTypeMapping, columns, typeManager);
IcebergStatistics.Builder icebergStatisticsBuilder = new IcebergStatistics.Builder(columns, typeManager);
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
for (FileScanTask fileScanTask : fileScanTasks) {
DataFile dataFile = fileScanTask.file();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,33 @@ public void testPartitionTable()
assertEquals(rowsByPartition.get(LocalDate.parse("2019-09-10")).getField(4), new MaterializedRow(DEFAULT_PRECISION, new MaterializedRow(DEFAULT_PRECISION, 4L, 5L, 0L)));
}

@Test
public void testPartitionTableOnDropColumn()
{
assertUpdate("CREATE TABLE test_schema.test_table_multi_column (_varchar VARCHAR, _bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'])");
assertUpdate("INSERT INTO test_schema.test_table_multi_column VALUES ('a', 0, CAST('2019-09-08' AS DATE)), ('a', 1, CAST('2019-09-09' AS DATE)), ('b', 2, CAST('2019-09-09' AS DATE))", 3);
assertUpdate("INSERT INTO test_schema.test_table_multi_column VALUES ('c', 3, CAST('2019-09-09' AS DATE)), ('a', 4, CAST('2019-09-10' AS DATE)), ('b', 5, CAST('2019-09-10' AS DATE))", 3);
assertQuery("SELECT count(*) FROM test_schema.test_table_multi_column", "VALUES 6");
MaterializedResult result = computeActual("SELECT * from test_schema.\"test_table_multi_column$partitions\"");

assertEquals(result.getRowCount(), 3);
Map<LocalDate, MaterializedRow> rowsByPartition = result.getMaterializedRows().stream()
.collect(toImmutableMap(row -> ((LocalDate) ((MaterializedRow) row.getField(0)).getField(0)), Function.identity()));

assertEquals(rowsByPartition.get(LocalDate.parse("2019-09-08")).getField(4), new MaterializedRow(DEFAULT_PRECISION,
new MaterializedRow(DEFAULT_PRECISION, "a", "a", 0L),
new MaterializedRow(DEFAULT_PRECISION, 0L, 0L, 0L)));

assertUpdate("ALTER TABLE test_schema.test_table_multi_column drop column _varchar");
MaterializedResult resultAfterDrop = computeActual("SELECT * from test_schema.\"test_table_multi_column$partitions\"");
assertEquals(resultAfterDrop.getRowCount(), 3);
Map<LocalDate, MaterializedRow> rowsByPartitionAfterDrop = resultAfterDrop.getMaterializedRows().stream()
.collect(toImmutableMap(row -> ((LocalDate) ((MaterializedRow) row.getField(0)).getField(0)), Function.identity()));
assertEquals(rowsByPartitionAfterDrop.get(LocalDate.parse("2019-09-08")).getField(4), new MaterializedRow(DEFAULT_PRECISION,
new MaterializedRow(DEFAULT_PRECISION, 0L, 0L, 0L)));
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_multi_column");
}

@Test
public void testHistoryTable()
{
Expand Down

0 comments on commit 0c346b5

Please sign in to comment.