From da1de5e4af3a59257fdeed6a2e14f3010e7c8e2f Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 23 Sep 2022 17:32:47 +0200 Subject: [PATCH] Calculate Iceberg NDV with a Theta sketch Iceberg specification defines the Apache DataSketches's Theta as the common data sketch for keeping track of distinct values in a table. This change replaces the use of HLL within Iceberg's ANALYZE with Theta sketch. The follow-up work is to store the serialized compact form of the sketch inside the Iceberg Puffin statistics file, but this requires Iceberg API changes, which are still in progress. A side effect of this change is that complex types (array, map, row) can no longer be analyzed: Trino can calculate a HyperLogLog for these types, while Iceberg does not specify binary representation for these types, which is required to feed data into a Theta sketch. However, NDV for complex types is not as useful as it is for scalar types, so this shouldn't matter in practice. --- plugin/trino-iceberg/pom.xml | 12 ++ .../trino/plugin/iceberg/IcebergMetadata.java | 21 +-- .../trino/plugin/iceberg/IcebergPlugin.java | 10 ++ .../iceberg/aggregation/DataSketchState.java | 32 +++++ .../DataSketchStateSerializer.java | 73 +++++++++++ .../IcebergThetaSketchForStats.java | 124 ++++++++++++++++++ .../iceberg/BaseIcebergConnectorTest.java | 12 +- 7 files changed, 269 insertions(+), 15 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index d9130907155d..32c1e408696d 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -165,6 +165,18 @@ failsafe + + org.apache.datasketches + datasketches-java + 3.3.0 + + + + org.apache.datasketches + datasketches-memory + 2.1.0 + + org.apache.iceberg iceberg-api diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index f461a00b2f22..6a0af1a0c255 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -34,6 +34,8 @@ import io.trino.plugin.hive.HiveApplyProjectionUtil; import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation; import io.trino.plugin.hive.HiveWrittenPartitions; +import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer; +import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle; import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle; @@ -94,6 +96,7 @@ import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeOperators; +import org.apache.datasketches.theta.CompactSketch; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; @@ -229,7 +232,6 @@ import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW; -import static io.trino.spi.predicate.Utils.blockToNativeValue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; import static io.trino.spi.type.UuidType.UUID; @@ -264,7 +266,7 @@ public class IcebergMetadata public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp"; private static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES"; - private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName("approx_distinct"); + private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME); private final TypeManager typeManager; private final TypeOperators typeOperators; @@ -1468,8 +1470,9 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession } ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); - Set allDataColumnNames = tableMetadata.getColumns().stream() + Set allScalarColumnNames = tableMetadata.getColumns().stream() .filter(column -> !column.isHidden()) + .filter(column -> column.getType().getTypeParameters().isEmpty()) // is scalar type .map(ColumnMetadata::getName) .collect(toImmutableSet()); @@ -1479,18 +1482,17 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession if (columnNames.isEmpty()) { throw new TrinoException(INVALID_ANALYZE_PROPERTY, "Cannot specify empty list of columns for analysis"); } - if (!allDataColumnNames.containsAll(columnNames)) { + if (!allScalarColumnNames.containsAll(columnNames)) { throw new TrinoException( INVALID_ANALYZE_PROPERTY, - format("Invalid columns specified for analysis: %s", Sets.difference(columnNames, allDataColumnNames))); + format("Invalid columns specified for analysis: %s", Sets.difference(columnNames, allScalarColumnNames))); } return columnNames; }) - .orElse(allDataColumnNames); + .orElse(allScalarColumnNames); Set columnStatistics = tableMetadata.getColumns().stream() .filter(column -> analyzeColumnNames.contains(column.getName())) - // TODO: add support for NDV summary/sketch, but using Theta sketch, not HLL; see https://github.com/apache/iceberg-docs/pull/69 .map(column -> new ColumnStatisticMetadata(column.getName(), NUMBER_OF_DISTINCT_VALUES_NAME, NUMBER_OF_DISTINCT_VALUES_FUNCTION)) .collect(toImmutableSet()); @@ -1537,12 +1539,13 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH for (Map.Entry entry : computedStatistic.getColumnStatistics().entrySet()) { ColumnStatisticMetadata statisticMetadata = entry.getKey(); if (statisticMetadata.getConnectorAggregationId().equals(NUMBER_OF_DISTINCT_VALUES_NAME)) { - long ndv = (long) blockToNativeValue(BIGINT, entry.getValue()); Integer columnId = verifyNotNull( columnNameToId.get(statisticMetadata.getColumnName()), "Column not found in table: [%s]", statisticMetadata.getColumnName()); - updateProperties.set(TRINO_STATS_NDV_FORMAT.formatted(columnId), Long.toString(ndv)); + CompactSketch sketch = DataSketchStateSerializer.deserialize(entry.getValue(), 0); + // TODO: store whole sketch to support updates, see also https://github.com/apache/iceberg-docs/pull/69 + updateProperties.set(TRINO_STATS_NDV_FORMAT.formatted(columnId), Long.toString((long) sketch.getEstimate())); } else { throw new UnsupportedOperationException("Unsupported statistic: " + statisticMetadata); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java index 99e510ad73b1..bfdf4ce0844d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java @@ -14,9 +14,13 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; +import java.util.Set; + public class IcebergPlugin implements Plugin { @@ -25,4 +29,10 @@ public Iterable getConnectorFactories() { return ImmutableList.of(new IcebergConnectorFactory()); } + + @Override + public Set> getFunctions() + { + return ImmutableSet.of(IcebergThetaSketchForStats.class); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java new file mode 100644 index 000000000000..1e79168d6754 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.trino.spi.function.AccumulatorState; +import io.trino.spi.function.AccumulatorStateMetadata; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.UpdateSketch; + +@AccumulatorStateMetadata(stateSerializerClass = DataSketchStateSerializer.class) +public interface DataSketchState + extends AccumulatorState +{ + UpdateSketch getUpdateSketch(); + + void setUpdateSketch(UpdateSketch value); + + CompactSketch getCompactSketch(); + + void setCompactSketch(CompactSketch value); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java new file mode 100644 index 000000000000..91a5068c6b55 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.function.AccumulatorStateSerializer; +import io.trino.spi.type.Type; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.theta.CompactSketch; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.spi.type.VarbinaryType.VARBINARY; + +public class DataSketchStateSerializer + implements AccumulatorStateSerializer +{ + @Override + public Type getSerializedType() + { + return VARBINARY; + } + + @Override + public void serialize(DataSketchState state, BlockBuilder out) + { + serializeToVarbinary(state, out); + } + + public static void serializeToVarbinary(DataSketchState state, BlockBuilder out) + { + if (state.getUpdateSketch() == null && state.getCompactSketch() == null) { + out.appendNull(); + } + else { + checkArgument(state.getUpdateSketch() == null || state.getCompactSketch() == null, "A state must not have both transient accumulator and combined form set"); + CompactSketch compactSketch = Optional.ofNullable(state.getCompactSketch()) + .orElseGet(() -> state.getUpdateSketch().compact()); + Slice slice = Slices.wrappedBuffer(compactSketch.toByteArray()); + VARBINARY.writeSlice(out, slice); + } + } + + @Override + public void deserialize(Block block, int index, DataSketchState state) + { + if (!block.isNull(index)) { + state.setCompactSketch(deserialize(block, index)); + } + } + + public static CompactSketch deserialize(Block block, int index) + { + checkArgument(!block.isNull(index), "Value is null"); + Slice slice = VARBINARY.getSlice(block, index); + return CompactSketch.heapify(WritableMemory.writableWrap(slice.getBytes())); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java new file mode 100644 index 000000000000..518015c2687d --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java @@ -0,0 +1,124 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.function.AggregationFunction; +import io.trino.spi.function.AggregationState; +import io.trino.spi.function.BlockIndex; +import io.trino.spi.function.BlockPosition; +import io.trino.spi.function.CombineFunction; +import io.trino.spi.function.InputFunction; +import io.trino.spi.function.OutputFunction; +import io.trino.spi.function.SqlType; +import io.trino.spi.function.TypeParameter; +import io.trino.spi.type.StandardTypes; +import io.trino.spi.type.Type; +import org.apache.datasketches.Family; +import org.apache.datasketches.theta.SetOperation; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Union; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.types.Conversions; + +import javax.annotation.Nullable; + +import java.nio.ByteBuffer; + +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.iceberg.IcebergTypes.convertTrinoValueToIceberg; +import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; +import static io.trino.spi.type.TypeUtils.readNativeValue; +import static java.util.Objects.requireNonNull; + +@AggregationFunction(value = IcebergThetaSketchForStats.NAME, hidden = true) +public final class IcebergThetaSketchForStats +{ + private IcebergThetaSketchForStats() {} + + public static final String NAME = "$iceberg_theta_stat"; + + @InputFunction + @TypeParameter("T") + public static void input(@TypeParameter("T") Type type, @AggregationState DataSketchState state, @BlockPosition @SqlType("T") Block block, @BlockIndex int index) + { + verify(!block.isNull(index), "Input function is not expected to be called on a NULL input"); + + Object trinoValue = readNativeValue(type, block, index); + org.apache.iceberg.types.Type icebergType = toIcebergType(type); + Object icebergValue = convertTrinoValueToIceberg(type, trinoValue); + ByteBuffer byteBuffer = Conversions.toByteBuffer(icebergType, icebergValue); + requireNonNull(byteBuffer, "byteBuffer is null"); // trino value isn't null + byte[] bytes = getBytes(byteBuffer); + getOrCreateUpdateSketch(state).update(bytes); + } + + @CombineFunction + public static void combine(@AggregationState DataSketchState state, @AggregationState DataSketchState otherState) + { + Union union = SetOperation.builder().buildUnion(); + addIfPresent(union, state.getUpdateSketch()); + addIfPresent(union, state.getCompactSketch()); + addIfPresent(union, otherState.getUpdateSketch()); + addIfPresent(union, otherState.getCompactSketch()); + + state.setUpdateSketch(null); + state.setCompactSketch(union.getResult()); + } + + @OutputFunction(StandardTypes.VARBINARY) + public static void output(@AggregationState DataSketchState state, BlockBuilder out) + { + if (state.getUpdateSketch() == null && state.getCompactSketch() == null) { + getOrCreateUpdateSketch(state); + } + DataSketchStateSerializer.serializeToVarbinary(state, out); + } + + private static UpdateSketch getOrCreateUpdateSketch(@AggregationState DataSketchState state) + { + UpdateSketch sketch = state.getUpdateSketch(); + if (sketch == null) { + // Must match Iceberg table statistics specification + // https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type + sketch = UpdateSketch.builder() + .setFamily(Family.ALPHA) + .build(); + state.setUpdateSketch(sketch); + } + return sketch; + } + + private static void addIfPresent(Union union, @Nullable Sketch input) + { + if (input != null) { + union.union(input); + } + } + + private static byte[] getBytes(ByteBuffer byteBuffer) + { + int length = byteBuffer.remaining(); + if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0) { + byte[] bytes = byteBuffer.array(); + if (bytes.length == length) { + return bytes; + } + } + byte[] bytes = new byte[length]; + byteBuffer.get(bytes); + return bytes; + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 7b5cca976aba..25f9f7cefe87 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -3998,9 +3998,9 @@ public void testAllAvailableTypes() " ('a_timestamp', NULL, 1e0, 0.5e0, NULL, " + (format == ORC ? "'2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'" : "'2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'") + "), " + " ('a_timestamptz', NULL, 1e0, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + " ('a_uuid', NULL, 1e0, 0.5e0, NULL, NULL, NULL), " + - " ('a_row', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + - " ('an_array', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + - " ('a_map', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); } else { @@ -4021,9 +4021,9 @@ public void testAllAvailableTypes() " ('a_timestamp', NULL, 1e0, NULL, NULL, NULL, NULL), " + " ('a_timestamptz', NULL, 1e0, NULL, NULL, NULL, NULL), " + " ('a_uuid', NULL, 1e0, NULL, NULL, NULL, NULL), " + - " ('a_row', NULL, 1e0, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, 1e0, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); }