diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java index f3b34488b627..22cf672d5081 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java @@ -31,7 +31,6 @@ import io.trino.sql.planner.plan.StatisticAggregations; import io.trino.sql.planner.plan.StatisticAggregationsDescriptor; import io.trino.sql.tree.QualifiedName; -import io.trino.sql.tree.SymbolReference; import java.util.List; import java.util.Map; @@ -122,26 +121,26 @@ public TableStatisticAggregation createStatisticsAggregation(TableStatisticsMeta private ColumnStatisticsAggregation createColumnAggregation(ColumnStatisticType statisticType, Symbol input, Type inputType) { return switch (statisticType) { - case MIN_VALUE -> createAggregation(QualifiedName.of("min"), input.toSymbolReference(), inputType); - case MAX_VALUE -> createAggregation(QualifiedName.of("max"), input.toSymbolReference(), inputType); - case NUMBER_OF_DISTINCT_VALUES -> createAggregation(QualifiedName.of("approx_distinct"), input.toSymbolReference(), inputType); + case MIN_VALUE -> createAggregation(QualifiedName.of("min"), input, inputType); + case MAX_VALUE -> createAggregation(QualifiedName.of("max"), input, inputType); + case NUMBER_OF_DISTINCT_VALUES -> createAggregation(QualifiedName.of("approx_distinct"), input, inputType); case NUMBER_OF_DISTINCT_VALUES_SUMMARY -> // we use $approx_set here and not approx_set because latter is not defined for all types supported by Trino - createAggregation(QualifiedName.of("$approx_set"), input.toSymbolReference(), inputType); - case NUMBER_OF_NON_NULL_VALUES -> createAggregation(QualifiedName.of("count"), input.toSymbolReference(), inputType); - case NUMBER_OF_TRUE_VALUES -> createAggregation(QualifiedName.of("count_if"), input.toSymbolReference(), BOOLEAN); - case TOTAL_SIZE_IN_BYTES -> createAggregation(QualifiedName.of(SumDataSizeForStats.NAME), input.toSymbolReference(), inputType); - case MAX_VALUE_SIZE_IN_BYTES -> createAggregation(QualifiedName.of(MaxDataSizeForStats.NAME), input.toSymbolReference(), inputType); + createAggregation(QualifiedName.of("$approx_set"), input, inputType); + case NUMBER_OF_NON_NULL_VALUES -> createAggregation(QualifiedName.of("count"), input, inputType); + case NUMBER_OF_TRUE_VALUES -> createAggregation(QualifiedName.of("count_if"), input, BOOLEAN); + case TOTAL_SIZE_IN_BYTES -> createAggregation(QualifiedName.of(SumDataSizeForStats.NAME), input, inputType); + case MAX_VALUE_SIZE_IN_BYTES -> createAggregation(QualifiedName.of(MaxDataSizeForStats.NAME), input, inputType); }; } private ColumnStatisticsAggregation createColumnAggregation(FunctionName aggregation, Symbol input, Type inputType) { checkArgument(aggregation.getCatalogSchema().isEmpty(), "Catalog/schema name not supported"); - return createAggregation(QualifiedName.of(aggregation.getName()), input.toSymbolReference(), inputType); + return createAggregation(QualifiedName.of(aggregation.getName()), input, inputType); } - private ColumnStatisticsAggregation createAggregation(QualifiedName functionName, SymbolReference input, Type inputType) + private ColumnStatisticsAggregation createAggregation(QualifiedName functionName, Symbol input, Type inputType) { ResolvedFunction resolvedFunction = metadata.resolveFunction(session, functionName, fromTypes(inputType)); Type resolvedType = getOnlyElement(resolvedFunction.getSignature().getArgumentTypes()); @@ -149,7 +148,7 @@ private ColumnStatisticsAggregation createAggregation(QualifiedName functionName return new ColumnStatisticsAggregation( new AggregationNode.Aggregation( resolvedFunction, - ImmutableList.of(input), + ImmutableList.of(input.toSymbolReference()), false, Optional.empty(), Optional.empty(), diff --git a/core/trino-main/src/main/java/io/trino/testing/MaterializedResult.java b/core/trino-main/src/main/java/io/trino/testing/MaterializedResult.java index 4fd7474add90..06c420a23dfa 100644 --- a/core/trino-main/src/main/java/io/trino/testing/MaterializedResult.java +++ b/core/trino-main/src/main/java/io/trino/testing/MaterializedResult.java @@ -91,6 +91,7 @@ public class MaterializedResult private final List rows; private final List types; + private final List columnNames; private final Map setSessionProperties; private final Set resetSessionProperties; private final Optional updateType; @@ -100,12 +101,13 @@ public class MaterializedResult public MaterializedResult(List rows, List types) { - this(rows, types, ImmutableMap.of(), ImmutableSet.of(), Optional.empty(), OptionalLong.empty(), ImmutableList.of(), Optional.empty()); + this(rows, types, ImmutableList.of(), ImmutableMap.of(), ImmutableSet.of(), Optional.empty(), OptionalLong.empty(), ImmutableList.of(), Optional.empty()); } public MaterializedResult( List rows, List types, + List columnNames, Map setSessionProperties, Set resetSessionProperties, Optional updateType, @@ -115,6 +117,7 @@ public MaterializedResult( { this.rows = ImmutableList.copyOf(requireNonNull(rows, "rows is null")); this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); + this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null")); this.setSessionProperties = ImmutableMap.copyOf(requireNonNull(setSessionProperties, "setSessionProperties is null")); this.resetSessionProperties = ImmutableSet.copyOf(requireNonNull(resetSessionProperties, "resetSessionProperties is null")); this.updateType = requireNonNull(updateType, "updateType is null"); @@ -144,6 +147,12 @@ public List getTypes() return types; } + public List getColumnNames() + { + checkState(!columnNames.isEmpty(), "Column names are unknown"); + return columnNames; + } + public Map getSetSessionProperties() { return setSessionProperties; @@ -362,6 +371,7 @@ public MaterializedResult toTestTypes() .map(MaterializedResult::convertToTestTypes) .collect(toImmutableList()), types, + columnNames, setSessionProperties, resetSessionProperties, updateType, diff --git a/core/trino-main/src/test/java/io/trino/sql/query/QueryAssertions.java b/core/trino-main/src/test/java/io/trino/sql/query/QueryAssertions.java index 8b518bcc4051..a2eeb10d9bc2 100644 --- a/core/trino-main/src/test/java/io/trino/sql/query/QueryAssertions.java +++ b/core/trino-main/src/test/java/io/trino/sql/query/QueryAssertions.java @@ -313,6 +313,8 @@ private QueryAssert( this.skipResultsCorrectnessCheckForPushdown = skipResultsCorrectnessCheckForPushdown; } + // TODO for better readability, replace this with `exceptColumns(String... columnNamesToExclude)` leveraging MaterializedResult.getColumnNames + @Deprecated public QueryAssert projected(int... columns) { return new QueryAssert( diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 2fa12450ef3c..26f94f628b28 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -165,6 +165,18 @@ failsafe + + org.apache.datasketches + datasketches-java + 3.3.0 + + + + org.apache.datasketches + datasketches-memory + 2.1.0 + + org.apache.iceberg iceberg-api diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java index 205ce2183afa..ddb5e6555eed 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java @@ -14,31 +14,16 @@ package io.trino.plugin.iceberg; import com.google.common.base.VerifyException; -import io.airlift.slice.Slice; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.ArrayType; -import io.trino.spi.type.BigintType; -import io.trino.spi.type.BooleanType; -import io.trino.spi.type.DateType; -import io.trino.spi.type.DecimalType; -import io.trino.spi.type.DoubleType; -import io.trino.spi.type.Int128; -import io.trino.spi.type.IntegerType; -import io.trino.spi.type.LongTimestampWithTimeZone; import io.trino.spi.type.MapType; -import io.trino.spi.type.RealType; import io.trino.spi.type.RowType; import io.trino.spi.type.Type; -import io.trino.spi.type.UuidType; -import io.trino.spi.type.VarbinaryType; -import io.trino.spi.type.VarcharType; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import java.math.BigDecimal; -import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -48,16 +33,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; -import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; -import static io.trino.spi.type.TimeType.TIME_MICROS; -import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; -import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; -import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; -import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid; -import static java.lang.Float.intBitsToFloat; -import static java.lang.Math.toIntExact; +import static io.trino.plugin.iceberg.IcebergTypes.convertTrinoValueToIceberg; import static java.lang.String.format; -import static java.util.Objects.requireNonNull; import static org.apache.iceberg.expressions.Expressions.alwaysFalse; import static org.apache.iceberg.expressions.Expressions.alwaysTrue; import static org.apache.iceberg.expressions.Expressions.equal; @@ -117,7 +94,7 @@ private static Expression toIcebergExpression(String columnName, Type type, Doma List rangeExpressions = new ArrayList<>(); for (Range range : orderedRanges) { if (range.isSingleValue()) { - icebergValues.add(getIcebergLiteralValue(type, range.getLowBoundedValue())); + icebergValues.add(convertTrinoValueToIceberg(type, range.getLowBoundedValue())); } else { rangeExpressions.add(toIcebergExpression(columnName, range)); @@ -137,13 +114,13 @@ private static Expression toIcebergExpression(String columnName, Range range) Type type = range.getType(); if (range.isSingleValue()) { - Object icebergValue = getIcebergLiteralValue(type, range.getSingleValue()); + Object icebergValue = convertTrinoValueToIceberg(type, range.getSingleValue()); return equal(columnName, icebergValue); } List conjuncts = new ArrayList<>(2); if (!range.isLowUnbounded()) { - Object icebergLow = getIcebergLiteralValue(type, range.getLowBoundedValue()); + Object icebergLow = convertTrinoValueToIceberg(type, range.getLowBoundedValue()); Expression lowBound; if (range.isLowInclusive()) { lowBound = greaterThanOrEqual(columnName, icebergLow); @@ -155,7 +132,7 @@ private static Expression toIcebergExpression(String columnName, Range range) } if (!range.isHighUnbounded()) { - Object icebergHigh = getIcebergLiteralValue(type, range.getHighBoundedValue()); + Object icebergHigh = convertTrinoValueToIceberg(type, range.getHighBoundedValue()); Expression highBound; if (range.isHighInclusive()) { highBound = lessThanOrEqual(columnName, icebergHigh); @@ -169,68 +146,6 @@ private static Expression toIcebergExpression(String columnName, Range range) return and(conjuncts); } - private static Object getIcebergLiteralValue(Type type, Object trinoNativeValue) - { - requireNonNull(trinoNativeValue, "trinoNativeValue is null"); - - if (type instanceof BooleanType) { - return (boolean) trinoNativeValue; - } - - if (type instanceof IntegerType) { - return toIntExact((long) trinoNativeValue); - } - - if (type instanceof BigintType) { - return (long) trinoNativeValue; - } - - if (type instanceof RealType) { - return intBitsToFloat(toIntExact((long) trinoNativeValue)); - } - - if (type instanceof DoubleType) { - return (double) trinoNativeValue; - } - - if (type instanceof DateType) { - return toIntExact(((Long) trinoNativeValue)); - } - - if (type.equals(TIME_MICROS)) { - return ((long) trinoNativeValue) / PICOSECONDS_PER_MICROSECOND; - } - - if (type.equals(TIMESTAMP_MICROS)) { - return (long) trinoNativeValue; - } - - if (type.equals(TIMESTAMP_TZ_MICROS)) { - return timestampTzToMicros((LongTimestampWithTimeZone) trinoNativeValue); - } - - if (type instanceof VarcharType) { - return ((Slice) trinoNativeValue).toStringUtf8(); - } - - if (type instanceof VarbinaryType) { - return ByteBuffer.wrap(((Slice) trinoNativeValue).getBytes()); - } - - if (type instanceof UuidType) { - return trinoUuidToJavaUuid(((Slice) trinoNativeValue)); - } - - if (type instanceof DecimalType decimalType) { - if (decimalType.isShort()) { - return BigDecimal.valueOf((long) trinoNativeValue).movePointLeft(decimalType.getScale()); - } - return new BigDecimal(((Int128) trinoNativeValue).toBigInteger(), decimalType.getScale()); - } - - throw new UnsupportedOperationException("Unsupported type: " + type); - } - private static Expression and(List expressions) { if (expressions.isEmpty()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index f461a00b2f22..6a0af1a0c255 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -34,6 +34,8 @@ import io.trino.plugin.hive.HiveApplyProjectionUtil; import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation; import io.trino.plugin.hive.HiveWrittenPartitions; +import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer; +import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle; import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle; @@ -94,6 +96,7 @@ import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeOperators; +import org.apache.datasketches.theta.CompactSketch; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; @@ -229,7 +232,6 @@ import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW; -import static io.trino.spi.predicate.Utils.blockToNativeValue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; import static io.trino.spi.type.UuidType.UUID; @@ -264,7 +266,7 @@ public class IcebergMetadata public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp"; private static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES"; - private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName("approx_distinct"); + private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME); private final TypeManager typeManager; private final TypeOperators typeOperators; @@ -1468,8 +1470,9 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession } ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); - Set allDataColumnNames = tableMetadata.getColumns().stream() + Set allScalarColumnNames = tableMetadata.getColumns().stream() .filter(column -> !column.isHidden()) + .filter(column -> column.getType().getTypeParameters().isEmpty()) // is scalar type .map(ColumnMetadata::getName) .collect(toImmutableSet()); @@ -1479,18 +1482,17 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession if (columnNames.isEmpty()) { throw new TrinoException(INVALID_ANALYZE_PROPERTY, "Cannot specify empty list of columns for analysis"); } - if (!allDataColumnNames.containsAll(columnNames)) { + if (!allScalarColumnNames.containsAll(columnNames)) { throw new TrinoException( INVALID_ANALYZE_PROPERTY, - format("Invalid columns specified for analysis: %s", Sets.difference(columnNames, allDataColumnNames))); + format("Invalid columns specified for analysis: %s", Sets.difference(columnNames, allScalarColumnNames))); } return columnNames; }) - .orElse(allDataColumnNames); + .orElse(allScalarColumnNames); Set columnStatistics = tableMetadata.getColumns().stream() .filter(column -> analyzeColumnNames.contains(column.getName())) - // TODO: add support for NDV summary/sketch, but using Theta sketch, not HLL; see https://github.com/apache/iceberg-docs/pull/69 .map(column -> new ColumnStatisticMetadata(column.getName(), NUMBER_OF_DISTINCT_VALUES_NAME, NUMBER_OF_DISTINCT_VALUES_FUNCTION)) .collect(toImmutableSet()); @@ -1537,12 +1539,13 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH for (Map.Entry entry : computedStatistic.getColumnStatistics().entrySet()) { ColumnStatisticMetadata statisticMetadata = entry.getKey(); if (statisticMetadata.getConnectorAggregationId().equals(NUMBER_OF_DISTINCT_VALUES_NAME)) { - long ndv = (long) blockToNativeValue(BIGINT, entry.getValue()); Integer columnId = verifyNotNull( columnNameToId.get(statisticMetadata.getColumnName()), "Column not found in table: [%s]", statisticMetadata.getColumnName()); - updateProperties.set(TRINO_STATS_NDV_FORMAT.formatted(columnId), Long.toString(ndv)); + CompactSketch sketch = DataSketchStateSerializer.deserialize(entry.getValue(), 0); + // TODO: store whole sketch to support updates, see also https://github.com/apache/iceberg-docs/pull/69 + updateProperties.set(TRINO_STATS_NDV_FORMAT.formatted(columnId), Long.toString((long) sketch.getEstimate())); } else { throw new UnsupportedOperationException("Unsupported statistic: " + statisticMetadata); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java index 99e510ad73b1..bfdf4ce0844d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java @@ -14,9 +14,13 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; +import java.util.Set; + public class IcebergPlugin implements Plugin { @@ -25,4 +29,10 @@ public Iterable getConnectorFactories() { return ImmutableList.of(new IcebergConnectorFactory()); } + + @Override + public Set> getFunctions() + { + return ImmutableSet.of(IcebergThetaSketchForStats.class); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java index 81bea4d0a70c..7b8ef16b6a86 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java @@ -13,10 +13,16 @@ */ package io.trino.plugin.iceberg; +import com.google.common.math.LongMath; +import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Decimals; +import io.trino.spi.type.Int128; +import io.trino.spi.type.LongTimestampWithTimeZone; import io.trino.spi.type.UuidType; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -27,12 +33,101 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; +import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; +import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid; +import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid; +import static java.lang.Float.floatToIntBits; +import static java.lang.Float.intBitsToFloat; +import static java.lang.Math.multiplyExact; +import static java.lang.Math.toIntExact; +import static java.math.RoundingMode.UNNECESSARY; +import static java.util.Objects.requireNonNull; public final class IcebergTypes { private IcebergTypes() {} + /** + * Convert value from Trino representation to Iceberg representation. + * + * @apiNote This accepts a Trino type because, currently, no two Iceberg types translate to one Trino type. + */ + public static Object convertTrinoValueToIceberg(io.trino.spi.type.Type type, Object trinoNativeValue) + { + requireNonNull(trinoNativeValue, "trinoNativeValue is null"); + + if (type == BOOLEAN) { + //noinspection RedundantCast + return (boolean) trinoNativeValue; + } + + if (type == INTEGER) { + return toIntExact((long) trinoNativeValue); + } + + if (type == BIGINT) { + //noinspection RedundantCast + return (long) trinoNativeValue; + } + + if (type == REAL) { + return intBitsToFloat(toIntExact((long) trinoNativeValue)); + } + + if (type == DOUBLE) { + //noinspection RedundantCast + return (double) trinoNativeValue; + } + + if (type instanceof DecimalType decimalType) { + if (decimalType.isShort()) { + return BigDecimal.valueOf((long) trinoNativeValue).movePointLeft(decimalType.getScale()); + } + return new BigDecimal(((Int128) trinoNativeValue).toBigInteger(), decimalType.getScale()); + } + + if (type == DATE) { + return toIntExact((long) trinoNativeValue); + } + + if (type.equals(TIME_MICROS)) { + return LongMath.divide((long) trinoNativeValue, PICOSECONDS_PER_MICROSECOND, UNNECESSARY); + } + + if (type.equals(TIMESTAMP_MICROS)) { + //noinspection RedundantCast + return (long) trinoNativeValue; + } + + if (type.equals(TIMESTAMP_TZ_MICROS)) { + return timestampTzToMicros((LongTimestampWithTimeZone) trinoNativeValue); + } + + if (type instanceof VarcharType) { + return ((Slice) trinoNativeValue).toStringUtf8(); + } + + if (type instanceof VarbinaryType) { + return ByteBuffer.wrap(((Slice) trinoNativeValue).getBytes()); + } + + if (type == UuidType.UUID) { + return trinoUuidToJavaUuid(((Slice) trinoNativeValue)); + } + + throw new UnsupportedOperationException("Unsupported type: " + type); + } + /** * Convert value from Iceberg representation to Trino representation. */ @@ -43,21 +138,21 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value) } if (icebergType instanceof Types.BooleanType) { //noinspection RedundantCast - return (Boolean) value; + return (boolean) value; } if (icebergType instanceof Types.IntegerType) { - return ((Integer) value).longValue(); + return (long) (int) value; } if (icebergType instanceof Types.LongType) { //noinspection RedundantCast - return (Long) value; + return (long) value; } if (icebergType instanceof Types.FloatType) { - return (long) Float.floatToIntBits((Float) value); + return (long) floatToIntBits((float) value); } if (icebergType instanceof Types.DoubleType) { //noinspection RedundantCast - return (Double) value; + return (double) value; } if (icebergType instanceof Types.DecimalType icebergDecimalType) { DecimalType trinoDecimalType = DecimalType.createDecimalType(icebergDecimalType.precision(), icebergDecimalType.scale()); @@ -77,10 +172,10 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value) return Slices.wrappedBuffer(((ByteBuffer) value).array().clone()); } if (icebergType instanceof Types.DateType) { - return ((Integer) value).longValue(); + return (long) (int) value; } if (icebergType instanceof Types.TimeType) { - return Math.multiplyExact((Long) value, PICOSECONDS_PER_MICROSECOND); + return multiplyExact((long) value, PICOSECONDS_PER_MICROSECOND); } if (icebergType instanceof Types.TimestampType icebergTimestampType) { long epochMicros = (long) value; @@ -90,7 +185,7 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value) return epochMicros; } if (icebergType instanceof Types.UUIDType) { - return UuidType.javaUuidToTrinoUuid((UUID) value); + return javaUuidToTrinoUuid((UUID) value); } throw new UnsupportedOperationException("Unsupported iceberg type: " + icebergType); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java new file mode 100644 index 000000000000..1e79168d6754 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.trino.spi.function.AccumulatorState; +import io.trino.spi.function.AccumulatorStateMetadata; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.UpdateSketch; + +@AccumulatorStateMetadata(stateSerializerClass = DataSketchStateSerializer.class) +public interface DataSketchState + extends AccumulatorState +{ + UpdateSketch getUpdateSketch(); + + void setUpdateSketch(UpdateSketch value); + + CompactSketch getCompactSketch(); + + void setCompactSketch(CompactSketch value); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java new file mode 100644 index 000000000000..91a5068c6b55 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.function.AccumulatorStateSerializer; +import io.trino.spi.type.Type; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.theta.CompactSketch; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.spi.type.VarbinaryType.VARBINARY; + +public class DataSketchStateSerializer + implements AccumulatorStateSerializer +{ + @Override + public Type getSerializedType() + { + return VARBINARY; + } + + @Override + public void serialize(DataSketchState state, BlockBuilder out) + { + serializeToVarbinary(state, out); + } + + public static void serializeToVarbinary(DataSketchState state, BlockBuilder out) + { + if (state.getUpdateSketch() == null && state.getCompactSketch() == null) { + out.appendNull(); + } + else { + checkArgument(state.getUpdateSketch() == null || state.getCompactSketch() == null, "A state must not have both transient accumulator and combined form set"); + CompactSketch compactSketch = Optional.ofNullable(state.getCompactSketch()) + .orElseGet(() -> state.getUpdateSketch().compact()); + Slice slice = Slices.wrappedBuffer(compactSketch.toByteArray()); + VARBINARY.writeSlice(out, slice); + } + } + + @Override + public void deserialize(Block block, int index, DataSketchState state) + { + if (!block.isNull(index)) { + state.setCompactSketch(deserialize(block, index)); + } + } + + public static CompactSketch deserialize(Block block, int index) + { + checkArgument(!block.isNull(index), "Value is null"); + Slice slice = VARBINARY.getSlice(block, index); + return CompactSketch.heapify(WritableMemory.writableWrap(slice.getBytes())); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java new file mode 100644 index 000000000000..518015c2687d --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java @@ -0,0 +1,124 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.function.AggregationFunction; +import io.trino.spi.function.AggregationState; +import io.trino.spi.function.BlockIndex; +import io.trino.spi.function.BlockPosition; +import io.trino.spi.function.CombineFunction; +import io.trino.spi.function.InputFunction; +import io.trino.spi.function.OutputFunction; +import io.trino.spi.function.SqlType; +import io.trino.spi.function.TypeParameter; +import io.trino.spi.type.StandardTypes; +import io.trino.spi.type.Type; +import org.apache.datasketches.Family; +import org.apache.datasketches.theta.SetOperation; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Union; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.types.Conversions; + +import javax.annotation.Nullable; + +import java.nio.ByteBuffer; + +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.iceberg.IcebergTypes.convertTrinoValueToIceberg; +import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; +import static io.trino.spi.type.TypeUtils.readNativeValue; +import static java.util.Objects.requireNonNull; + +@AggregationFunction(value = IcebergThetaSketchForStats.NAME, hidden = true) +public final class IcebergThetaSketchForStats +{ + private IcebergThetaSketchForStats() {} + + public static final String NAME = "$iceberg_theta_stat"; + + @InputFunction + @TypeParameter("T") + public static void input(@TypeParameter("T") Type type, @AggregationState DataSketchState state, @BlockPosition @SqlType("T") Block block, @BlockIndex int index) + { + verify(!block.isNull(index), "Input function is not expected to be called on a NULL input"); + + Object trinoValue = readNativeValue(type, block, index); + org.apache.iceberg.types.Type icebergType = toIcebergType(type); + Object icebergValue = convertTrinoValueToIceberg(type, trinoValue); + ByteBuffer byteBuffer = Conversions.toByteBuffer(icebergType, icebergValue); + requireNonNull(byteBuffer, "byteBuffer is null"); // trino value isn't null + byte[] bytes = getBytes(byteBuffer); + getOrCreateUpdateSketch(state).update(bytes); + } + + @CombineFunction + public static void combine(@AggregationState DataSketchState state, @AggregationState DataSketchState otherState) + { + Union union = SetOperation.builder().buildUnion(); + addIfPresent(union, state.getUpdateSketch()); + addIfPresent(union, state.getCompactSketch()); + addIfPresent(union, otherState.getUpdateSketch()); + addIfPresent(union, otherState.getCompactSketch()); + + state.setUpdateSketch(null); + state.setCompactSketch(union.getResult()); + } + + @OutputFunction(StandardTypes.VARBINARY) + public static void output(@AggregationState DataSketchState state, BlockBuilder out) + { + if (state.getUpdateSketch() == null && state.getCompactSketch() == null) { + getOrCreateUpdateSketch(state); + } + DataSketchStateSerializer.serializeToVarbinary(state, out); + } + + private static UpdateSketch getOrCreateUpdateSketch(@AggregationState DataSketchState state) + { + UpdateSketch sketch = state.getUpdateSketch(); + if (sketch == null) { + // Must match Iceberg table statistics specification + // https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type + sketch = UpdateSketch.builder() + .setFamily(Family.ALPHA) + .build(); + state.setUpdateSketch(sketch); + } + return sketch; + } + + private static void addIfPresent(Union union, @Nullable Sketch input) + { + if (input != null) { + union.union(input); + } + } + + private static byte[] getBytes(ByteBuffer byteBuffer) + { + int length = byteBuffer.remaining(); + if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0) { + byte[] bytes = byteBuffer.array(); + if (bytes.length == length) { + return bytes; + } + } + byte[] bytes = new byte[length]; + byteBuffer.get(bytes); + return bytes; + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 9427798ee048..25f9f7cefe87 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -548,14 +548,14 @@ private void testSelectOrPartitionedByTimestampWithTimeZone(boolean partitioned) instant4Utc)); } else { - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query(format("SELECT record_count, file_count, data._timestamptz FROM \"%s$partitions\"", tableName))) .matches(format( "VALUES (BIGINT '4', BIGINT '4', CAST(ROW(%s, %s, 0, NULL) AS row(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint, nan_count bigint)))", format == ORC ? "TIMESTAMP '1969-12-01 05:06:07.234000 UTC'" : instant4Utc, format == ORC ? "TIMESTAMP '2021-10-31 00:30:00.007999 UTC'" : instant3Utc)); } - else if (format == AVRO) { + else { assertThat(query(format("SELECT record_count, file_count, data._timestamptz FROM \"%s$partitions\"", tableName))) .skippingTypesCheck() .matches("VALUES (BIGINT '4', BIGINT '4', CAST(NULL AS row(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint, nan_count bigint)))"); @@ -572,14 +572,14 @@ else if (format == AVRO) { } else { // show stats - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR " + tableName)) .skippingTypesCheck() .matches("VALUES " + "('_timestamptz', NULL, NULL, 0e0, NULL, '1969-12-01 05:06:07.234 UTC', '2021-10-31 00:30:00.007 UTC'), " + "(NULL, NULL, NULL, NULL, 4e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR " + tableName)) .skippingTypesCheck() .matches("VALUES " + @@ -799,76 +799,78 @@ public void testCreatePartitionedTable() .matches(nullValues); // SHOW STATS - if (format == ORC) { - assertQuery("SHOW STATS FOR test_partitioned_table", - "VALUES " + - " ('a_boolean', NULL, NULL, 0.5, NULL, 'true', 'true'), " + - " ('an_integer', NULL, NULL, 0.5, NULL, '1', '1'), " + - " ('a_bigint', NULL, NULL, 0.5, NULL, '1', '1'), " + - " ('a_real', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, NULL, 0.5, NULL, '11.0', '11.0'), " + - " ('a_varchar', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_date', NULL, NULL, 0.5, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, NULL, 0.5, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + - " ('a_timestamptz', NULL, NULL, 0.5, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a quoted, field', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); - } - if (format == PARQUET) { - assertThat(query("SHOW STATS FOR test_partitioned_table")) - .skippingTypesCheck() - .matches("VALUES " + - " ('a_boolean', NULL, NULL, 0.5e0, NULL, 'true', 'true'), " + - " ('an_integer', NULL, NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_bigint', NULL, NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_real', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " + - " ('a_varchar', 87e0, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_varbinary', 82e0, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + - " ('a_timestamptz', NULL, NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('a quoted, field', 83e0, NULL, 0.5e0, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); - } - else if (format == AVRO) { - assertThat(query("SHOW STATS FOR test_partitioned_table")) - .skippingTypesCheck() - .matches("VALUES " + - " ('a_boolean', NULL, NULL, 0.5e0, NULL, 'true', 'true'), " + - " ('an_integer', NULL, NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_bigint', NULL, NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_real', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " + - " ('a_varchar', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + - " ('a_timestamptz', NULL, NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('a quoted, field', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + switch (format) { + case ORC -> { + assertQuery("SHOW STATS FOR test_partitioned_table", + "VALUES " + + " ('a_boolean', NULL, NULL, 0.5, NULL, 'true', 'true'), " + + " ('an_integer', NULL, NULL, 0.5, NULL, '1', '1'), " + + " ('a_bigint', NULL, NULL, 0.5, NULL, '1', '1'), " + + " ('a_real', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, NULL, 0.5, NULL, '11.0', '11.0'), " + + " ('a_varchar', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_varbinary', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_date', NULL, NULL, 0.5, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, NULL, 0.5, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + + " ('a_timestamptz', NULL, NULL, 0.5, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a quoted, field', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } + case PARQUET -> { + assertThat(query("SHOW STATS FOR test_partitioned_table")) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, NULL, 0.5e0, NULL, 'true', 'true'), " + + " ('an_integer', NULL, NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_bigint', NULL, NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_real', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " + + " ('a_varchar', 87e0, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_varbinary', 82e0, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + + " ('a_timestamptz', NULL, NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a quoted, field', 83e0, NULL, 0.5e0, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } + case AVRO -> { + assertThat(query("SHOW STATS FOR test_partitioned_table")) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, NULL, 0.5e0, NULL, 'true', 'true'), " + + " ('an_integer', NULL, NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_bigint', NULL, NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_real', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " + + " ('a_varchar', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_varbinary', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + + " ('a_timestamptz', NULL, NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a quoted, field', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } } // $partitions @@ -1160,7 +1162,7 @@ public void testShowStatsAfterAddColumn() assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (NULL, NULL, NULL)", 1); assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (7, 8, 9)", 1); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) .skippingTypesCheck() .matches("VALUES " + @@ -1169,7 +1171,7 @@ public void testShowStatsAfterAddColumn() " ('col2', NULL, NULL, 25e-2, NULL, '3', '9'), " + " (NULL, NULL, NULL, NULL, 4e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) .skippingTypesCheck() .matches("VALUES " + @@ -1182,7 +1184,7 @@ else if (format == AVRO) { // Columns added after some data files exist will not have valid statistics because not all files have min/max/null count statistics for the new column assertUpdate("ALTER TABLE test_show_stats_after_add_column ADD COLUMN col3 INTEGER"); assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (10, 11, 12, 13)", 1); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) .skippingTypesCheck() .matches("VALUES " + @@ -1192,7 +1194,7 @@ else if (format == AVRO) { " ('col3', NULL, NULL, NULL, NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 5e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) .skippingTypesCheck() .matches("VALUES " + @@ -1917,7 +1919,7 @@ public void testMonthTransformDate() String expectedDateStats = "NULL, NULL, 0.0666667e0, NULL, '1969-11-13', '2020-12-31'"; String expectedBigIntStats = "NULL, NULL, 0e0, NULL, '1', '101'"; - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertQuery( "SELECT partition.d_month, record_count, data.d.min, data.d.max, data.b.min, data.b.max FROM \"test_month_transform_date$partitions\"", "VALUES " + @@ -1932,7 +1934,7 @@ public void testMonthTransformDate() "(606, 2, DATE '2020-07-18', DATE '2020-07-28', 12, 13), " + "(611, 1, DATE '2020-12-31', DATE '2020-12-31', 14, 14)"); } - else if (format == AVRO) { + else { assertQuery( "SELECT partition.d_month, record_count, data.d.min, data.d.max, data.b.min, data.b.max FROM \"test_month_transform_date$partitions\"", "VALUES " + @@ -1992,7 +1994,7 @@ else if (format == AVRO) { assertThat(query("SELECT * FROM test_month_transform_date WHERE date_trunc('year', d) = DATE '2015-01-01'")) .isFullyPushedDown(); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_month_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2000,7 +2002,7 @@ else if (format == AVRO) { " ('b', NULL, NULL, 0e0, NULL, '1', '101'), " + " (NULL, NULL, NULL, NULL, 15e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_month_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2250,7 +2252,7 @@ public void testYearTransformDate() assertUpdate("INSERT INTO test_year_transform_date " + values, 13); assertQuery("SELECT * FROM test_year_transform_date", values); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertQuery( "SELECT partition.d_year, record_count, data.d.min, data.d.max, data.b.min, data.b.max FROM \"test_year_transform_date$partitions\"", "VALUES " + @@ -2262,7 +2264,7 @@ public void testYearTransformDate() "(46, 2, DATE '2016-05-15', DATE '2016-06-06', 9, 10), " + "(50, 2, DATE '2020-02-21', DATE '2020-11-10', 11, 12)"); } - else if (format == AVRO) { + else { assertQuery( "SELECT partition.d_year, record_count, data.d.min, data.d.max, data.b.min, data.b.max FROM \"test_year_transform_date$partitions\"", "VALUES " + @@ -2280,7 +2282,7 @@ else if (format == AVRO) { "SELECT * FROM test_year_transform_date WHERE day_of_week(d) = 1 AND b % 7 = 3", "VALUES (DATE '2016-06-06', 10)"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_year_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2288,7 +2290,7 @@ else if (format == AVRO) { " ('b', NULL, NULL, 0e0, NULL, '1', '101'), " + " (NULL, NULL, NULL, NULL, 13e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_year_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2325,7 +2327,7 @@ else if (format == AVRO) { assertThat(query("SELECT * FROM test_year_transform_date WHERE date_trunc('year', d) = DATE '2015-01-01'")) .isFullyPushedDown(); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_year_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2333,7 +2335,7 @@ else if (format == AVRO) { " ('b', NULL, NULL, 0e0, NULL, '1', '101'), " + " (NULL, NULL, NULL, NULL, 13e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_year_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2678,7 +2680,7 @@ public void testTruncateIntegerTransform(String dataType) "SELECT * FROM " + table + " WHERE d % 10 = -1 AND b % 7 = 3", "VALUES (-1, 10)"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR " + table)) .skippingTypesCheck() .matches("VALUES " + @@ -2686,7 +2688,7 @@ public void testTruncateIntegerTransform(String dataType) " ('b', NULL, NULL, 0e0, NULL, '1', '101'), " + " (NULL, NULL, NULL, NULL, 16e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR " + table)) .skippingTypesCheck() .matches("VALUES " + @@ -2917,12 +2919,12 @@ public void testVoidTransform() assertQuery("SELECT * FROM test_void_transform", values); assertQuery("SELECT COUNT(*) FROM \"test_void_transform$partitions\"", "SELECT 1"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertQuery( "SELECT partition.d_null, record_count, file_count, data.d.min, data.d.max, data.d.null_count, data.d.nan_count, data.b.min, data.b.max, data.b.null_count, data.b.nan_count FROM \"test_void_transform$partitions\"", "VALUES (NULL, 7, 1, 'Warsaw', 'mommy', 2, NULL, 1, 7, 0, NULL)"); } - else if (format == AVRO) { + else { assertQuery( "SELECT partition.d_null, record_count, file_count, data.d.min, data.d.max, data.d.null_count, data.d.nan_count, data.b.min, data.b.max, data.b.null_count, data.b.nan_count FROM \"test_void_transform$partitions\"", "VALUES (NULL, 7, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)"); @@ -2939,7 +2941,7 @@ else if (format == AVRO) { assertQuery("SELECT b FROM test_void_transform WHERE d IS NULL", "VALUES 6, 7"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_void_transform")) .skippingTypesCheck() .matches("VALUES " + @@ -2947,7 +2949,7 @@ else if (format == AVRO) { " ('b', NULL, NULL, 0e0, NULL, '1', '7'), " + " (NULL, NULL, NULL, NULL, 7e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_void_transform")) .skippingTypesCheck() .matches("VALUES " + @@ -3239,12 +3241,12 @@ public void testPartitionedTableStatistics() MaterializedRow row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row0.getField(3), 0.0); assertEquals(row0.getField(5), "-10.0"); assertEquals(row0.getField(6), "100.0"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3252,12 +3254,12 @@ else if (format == AVRO) { MaterializedRow row1 = result.getMaterializedRows().get(1); assertEquals(row1.getField(0), "col2"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row1.getField(3), 0.0); assertEquals(row1.getField(5), "-1"); assertEquals(row1.getField(6), "10"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3278,12 +3280,12 @@ else if (format == AVRO) { assertEquals(result.getRowCount(), 3); row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row0.getField(3), 5.0 / 12.0); assertEquals(row0.getField(5), "-10.0"); assertEquals(row0.getField(6), "105.0"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3291,12 +3293,12 @@ else if (format == AVRO) { row1 = result.getMaterializedRows().get(1); assertEquals(row1.getField(0), "col2"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row1.getField(3), 0.0); assertEquals(row1.getField(5), "-1"); assertEquals(row1.getField(6), "10"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3312,12 +3314,12 @@ else if (format == AVRO) { result = computeActual("SHOW STATS FOR iceberg.tpch.test_partitioned_table_statistics"); row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row0.getField(3), 5.0 / 17.0); assertEquals(row0.getField(5), "-10.0"); assertEquals(row0.getField(6), "105.0"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3325,12 +3327,12 @@ else if (format == AVRO) { row1 = result.getMaterializedRows().get(1); assertEquals(row1.getField(0), "col2"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row1.getField(3), 5.0 / 17.0); assertEquals(row1.getField(5), "-1"); assertEquals(row1.getField(6), "10"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3432,7 +3434,7 @@ public void testPartitionsTableWithColumnNameConflict(boolean partitioned) .matches("VALUES (11, 12, 13, 14, 15)"); // test $partitions - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SELECT * FROM \"test_partitions_with_conflict$partitions\"")) .matches("SELECT " + (partitioned ? "CAST(ROW(11) AS row(p integer)), " : "") + @@ -3457,7 +3459,7 @@ public void testPartitionsTableWithColumnNameConflict(boolean partitioned) " )" + ")"); } - else if (format == AVRO) { + else { assertThat(query("SELECT * FROM \"test_partitions_with_conflict$partitions\"")) .matches("SELECT " + (partitioned ? "CAST(ROW(11) AS row(p integer)), " : "") + @@ -3548,7 +3550,7 @@ public void testCreateNestedPartitionedTable() 1); assertEquals(computeActual("SELECT * from test_nested_table_1").getRowCount(), 1); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_nested_table_1")) .skippingTypesCheck() .matches("VALUES " + @@ -3568,7 +3570,7 @@ public void testCreateNestedPartitionedTable() " ('dt', NULL, NULL, 0e0, NULL, '2021-07-24', '2021-07-24'), " + " (NULL, NULL, NULL, NULL, 1e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_nested_table_1")) .skippingTypesCheck() .matches("VALUES " + @@ -3612,7 +3614,7 @@ else if (format == AVRO) { 1); assertEquals(computeActual("SELECT * from test_nested_table_2").getRowCount(), 1); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_nested_table_2")) .skippingTypesCheck() .matches("VALUES " + @@ -3627,7 +3629,7 @@ else if (format == AVRO) { " ('str', NULL, NULL, " + (format == ORC ? "0e0" : "NULL") + ", NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 1e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_nested_table_2")) .skippingTypesCheck() .matches("VALUES " + @@ -3924,7 +3926,7 @@ public void testAllAvailableTypes() .matches(nullValues); // SHOW STATS - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_all_types")) .skippingTypesCheck() .matches("VALUES " + @@ -3947,7 +3949,7 @@ public void testAllAvailableTypes() " ('a_map', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_all_types")) .skippingTypesCheck() .matches("VALUES " + @@ -3971,12 +3973,66 @@ else if (format == AVRO) { " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); } + // ANALYZE + Session defaultSession = getSession(); + String catalog = defaultSession.getCatalog().orElseThrow(); + Session extendedStatisticsEnabled = Session.builder(defaultSession) + .setCatalogSessionProperty(catalog, EXTENDED_STATISTICS_ENABLED, "true") + .build(); + assertUpdate(extendedStatisticsEnabled, "ANALYZE test_all_types"); + if (format != AVRO) { + assertThat(query(extendedStatisticsEnabled, "SHOW STATS FOR test_all_types")) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, 1e0, 0.5e0, NULL, 'true', 'true'), " + + " ('an_integer', NULL, 1e0, 0.5e0, NULL, '1', '1'), " + + " ('a_bigint', NULL, 1e0, 0.5e0, NULL, '1', '1'), " + + " ('a_real', NULL, 1e0, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, 1e0, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, 1e0, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, 1e0, 0.5e0, NULL, '11.0', '11.0'), " + + " ('a_varchar', " + (format == PARQUET ? "87e0" : "NULL") + ", 1e0, 0.5e0, NULL, NULL, NULL), " + + " ('a_varbinary', " + (format == PARQUET ? "82e0" : "NULL") + ", 1e0, 0.5e0, NULL, NULL, NULL), " + + " ('a_date', NULL, 1e0, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, 1e0, 0.5e0, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, 1e0, 0.5e0, NULL, " + (format == ORC ? "'2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'" : "'2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'") + "), " + + " ('a_timestamptz', NULL, 1e0, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, 1e0, 0.5e0, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } + else { + assertThat(query(extendedStatisticsEnabled, "SHOW STATS FOR test_all_types")) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('an_integer', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_bigint', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_real', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_double', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_short_decimal', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_long_decimal', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_varchar', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_varbinary', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_date', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_time', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_timestamptz', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_uuid', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } + // $partitions String schema = getSession().getSchema().orElseThrow(); assertThat(query("SELECT column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = 'test_all_types$partitions' ")) .skippingTypesCheck() .matches("VALUES 'record_count', 'file_count', 'total_size', 'data'"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SELECT " + " record_count," + " file_count, " + @@ -4024,7 +4080,7 @@ else if (format == AVRO) { ) + ")"); } - else if (format == AVRO) { + else { assertThat(query("SELECT " + " record_count," + " file_count, " + diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java index 059f558d148e..61873ce7286f 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java @@ -30,6 +30,7 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedResultWithQueryId; +import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; import org.assertj.core.api.AbstractThrowableAssert; @@ -48,6 +49,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; +import static com.google.common.base.Functions.identity; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; @@ -76,8 +78,10 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.data.Percentage.withPercentage; import static org.testng.Assert.assertEquals; public abstract class BaseFailureRecoveryTest @@ -311,7 +315,7 @@ public void testUpdateWithSubquery() } @Test(invocationCount = INVOCATION_COUNT) - public void testAnalyzeStatistics() + public void testAnalyzeTable() { testNonSelect( Optional.empty(), @@ -644,9 +648,41 @@ public void finishesSuccessfully(Consumer queryAssertion, boolean expec assertThat(expected.getUpdatedTableStatistics()).isPresent(); assertThat(actual.getUpdatedTableStatistics()).isPresent(); - MaterializedResult expectedUpdatedTableStatistics = expected.getUpdatedTableStatistics().get(); - MaterializedResult actualUpdatedTableStatistics = actual.getUpdatedTableStatistics().get(); - assertEqualsIgnoreOrder(actualUpdatedTableStatistics, expectedUpdatedTableStatistics, "For query: \n " + query); + MaterializedResult expectedUpdatedTableStatisticsResult = expected.getUpdatedTableStatistics().get(); + MaterializedResult actualUpdatedTableStatisticsResult = actual.getUpdatedTableStatistics().get(); + assertEquals(actualUpdatedTableStatisticsResult.getTypes(), expectedUpdatedTableStatisticsResult.getTypes(), "Column types"); + assertEquals(actualUpdatedTableStatisticsResult.getColumnNames(), expectedUpdatedTableStatisticsResult.getColumnNames(), "Column names"); + Map expectedUpdatedTableStatistics = expectedUpdatedTableStatisticsResult.getMaterializedRows().stream() + .collect(toMap(row -> (String) row.getField(0), identity())); + Map actualUpdatedTableStatistics = actualUpdatedTableStatisticsResult.getMaterializedRows().stream() + .collect(toMap(row -> (String) row.getField(0), identity())); + assertEquals(actualUpdatedTableStatistics.keySet(), expectedUpdatedTableStatistics.keySet(), "Table columns"); + expectedUpdatedTableStatistics.forEach((key, expectedRow) -> { + MaterializedRow actualRow = actualUpdatedTableStatistics.get(key); + assertEquals(actualRow.getFieldCount(), expectedRow.getFieldCount(), "Unexpected layout of stats"); + for (int statsColumnIndex = 0; statsColumnIndex < expectedRow.getFieldCount(); statsColumnIndex++) { + String statsColumnName = actualUpdatedTableStatisticsResult.getColumnNames().get(statsColumnIndex); + String testedFieldDescription = "Field %d '%s' in %s".formatted(statsColumnIndex, statsColumnName, actualRow); + Object expectedValue = expectedRow.getField(statsColumnIndex); + Object actualValue = actualRow.getField(statsColumnIndex); + if (expectedValue == null) { + assertThat(actualValue).as(testedFieldDescription) + .isNull(); + } + else { + switch (statsColumnName) { + case "data_size", "distinct_values_count" -> { + assertThat((double) actualValue).as(testedFieldDescription) + .isCloseTo((double) expectedValue, withPercentage(5)); + } + default -> { + assertThat(actualValue).as(testedFieldDescription) + .isEqualTo(expectedValue); + } + } + } + } + }); } else if (isUpdate) { assertEquals(actualQueryResult.getUpdateCount(), expectedQueryResult.getUpdateCount()); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java index 64203abe6c73..84db05e97050 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java @@ -220,4 +220,11 @@ protected List getTypes(List columns) .map(trinoServer.getTypeManager()::fromSqlType) .collect(toImmutableList()); } + + protected List getNames(List columns) + { + return columns.stream() + .map(Column::getName) + .collect(toImmutableList()); + } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/TestingTrinoClient.java b/testing/trino-testing/src/main/java/io/trino/testing/TestingTrinoClient.java index 833b3380bc5b..5fec102e39dc 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/TestingTrinoClient.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/TestingTrinoClient.java @@ -123,6 +123,7 @@ private class MaterializedResultSession private final ImmutableList.Builder rows = ImmutableList.builder(); private final AtomicReference> types = new AtomicReference<>(); + private final AtomicReference> columnNames = new AtomicReference<>(); private final AtomicReference> updateType = new AtomicReference<>(Optional.empty()); private final AtomicReference updateCount = new AtomicReference<>(OptionalLong.empty()); @@ -158,6 +159,7 @@ public void addResults(QueryStatusInfo statusInfo, QueryData data) { if (types.get() == null && statusInfo.getColumns() != null) { types.set(getTypes(statusInfo.getColumns())); + columnNames.set(getNames(statusInfo.getColumns())); } if (data.getData() != null) { @@ -173,6 +175,7 @@ public MaterializedResult build(Map setSessionProperties, Set