From df3e8680b8a766d5e431b86ca899c3a75226c303 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 21 Sep 2022 14:52:19 +0200 Subject: [PATCH 01/11] Simplify stats aggregation construction Reduce code noise by moving Symbol -> SymbolReference converstion to the construction method. --- .../planner/StatisticsAggregationPlanner.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java index f3b34488b627..22cf672d5081 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/StatisticsAggregationPlanner.java @@ -31,7 +31,6 @@ import io.trino.sql.planner.plan.StatisticAggregations; import io.trino.sql.planner.plan.StatisticAggregationsDescriptor; import io.trino.sql.tree.QualifiedName; -import io.trino.sql.tree.SymbolReference; import java.util.List; import java.util.Map; @@ -122,26 +121,26 @@ public TableStatisticAggregation createStatisticsAggregation(TableStatisticsMeta private ColumnStatisticsAggregation createColumnAggregation(ColumnStatisticType statisticType, Symbol input, Type inputType) { return switch (statisticType) { - case MIN_VALUE -> createAggregation(QualifiedName.of("min"), input.toSymbolReference(), inputType); - case MAX_VALUE -> createAggregation(QualifiedName.of("max"), input.toSymbolReference(), inputType); - case NUMBER_OF_DISTINCT_VALUES -> createAggregation(QualifiedName.of("approx_distinct"), input.toSymbolReference(), inputType); + case MIN_VALUE -> createAggregation(QualifiedName.of("min"), input, inputType); + case MAX_VALUE -> createAggregation(QualifiedName.of("max"), input, inputType); + case NUMBER_OF_DISTINCT_VALUES -> createAggregation(QualifiedName.of("approx_distinct"), input, inputType); case NUMBER_OF_DISTINCT_VALUES_SUMMARY -> // we use $approx_set here and not approx_set because latter is not defined for all types supported by Trino - createAggregation(QualifiedName.of("$approx_set"), input.toSymbolReference(), inputType); - case NUMBER_OF_NON_NULL_VALUES -> createAggregation(QualifiedName.of("count"), input.toSymbolReference(), inputType); - case NUMBER_OF_TRUE_VALUES -> createAggregation(QualifiedName.of("count_if"), input.toSymbolReference(), BOOLEAN); - case TOTAL_SIZE_IN_BYTES -> createAggregation(QualifiedName.of(SumDataSizeForStats.NAME), input.toSymbolReference(), inputType); - case MAX_VALUE_SIZE_IN_BYTES -> createAggregation(QualifiedName.of(MaxDataSizeForStats.NAME), input.toSymbolReference(), inputType); + createAggregation(QualifiedName.of("$approx_set"), input, inputType); + case NUMBER_OF_NON_NULL_VALUES -> createAggregation(QualifiedName.of("count"), input, inputType); + case NUMBER_OF_TRUE_VALUES -> createAggregation(QualifiedName.of("count_if"), input, BOOLEAN); + case TOTAL_SIZE_IN_BYTES -> createAggregation(QualifiedName.of(SumDataSizeForStats.NAME), input, inputType); + case MAX_VALUE_SIZE_IN_BYTES -> createAggregation(QualifiedName.of(MaxDataSizeForStats.NAME), input, inputType); }; } private ColumnStatisticsAggregation createColumnAggregation(FunctionName aggregation, Symbol input, Type inputType) { checkArgument(aggregation.getCatalogSchema().isEmpty(), "Catalog/schema name not supported"); - return createAggregation(QualifiedName.of(aggregation.getName()), input.toSymbolReference(), inputType); + return createAggregation(QualifiedName.of(aggregation.getName()), input, inputType); } - private ColumnStatisticsAggregation createAggregation(QualifiedName functionName, SymbolReference input, Type inputType) + private ColumnStatisticsAggregation createAggregation(QualifiedName functionName, Symbol input, Type inputType) { ResolvedFunction resolvedFunction = metadata.resolveFunction(session, functionName, fromTypes(inputType)); Type resolvedType = getOnlyElement(resolvedFunction.getSignature().getArgumentTypes()); @@ -149,7 +148,7 @@ private ColumnStatisticsAggregation createAggregation(QualifiedName functionName return new ColumnStatisticsAggregation( new AggregationNode.Aggregation( resolvedFunction, - ImmutableList.of(input), + ImmutableList.of(input.toSymbolReference()), false, Optional.empty(), Optional.empty(), From 63557a6b46eca32ee5813aebd5f67cba14252981 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 27 Sep 2022 09:04:34 +0200 Subject: [PATCH 02/11] Improve robustness of conditional assertions in BaseIcebergConnectorTest A hypothetical new format value should automatically get the same coverage (same number of assertions) as all the existing ones. Before the change, some assertions were run conditionally only if format is one of the known ones, which wasn't future-proof. --- .../iceberg/BaseIcebergConnectorTest.java | 234 +++++++++--------- 1 file changed, 118 insertions(+), 116 deletions(-) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 9427798ee048..642354bf33bd 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -548,14 +548,14 @@ private void testSelectOrPartitionedByTimestampWithTimeZone(boolean partitioned) instant4Utc)); } else { - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query(format("SELECT record_count, file_count, data._timestamptz FROM \"%s$partitions\"", tableName))) .matches(format( "VALUES (BIGINT '4', BIGINT '4', CAST(ROW(%s, %s, 0, NULL) AS row(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint, nan_count bigint)))", format == ORC ? "TIMESTAMP '1969-12-01 05:06:07.234000 UTC'" : instant4Utc, format == ORC ? "TIMESTAMP '2021-10-31 00:30:00.007999 UTC'" : instant3Utc)); } - else if (format == AVRO) { + else { assertThat(query(format("SELECT record_count, file_count, data._timestamptz FROM \"%s$partitions\"", tableName))) .skippingTypesCheck() .matches("VALUES (BIGINT '4', BIGINT '4', CAST(NULL AS row(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint, nan_count bigint)))"); @@ -572,14 +572,14 @@ else if (format == AVRO) { } else { // show stats - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR " + tableName)) .skippingTypesCheck() .matches("VALUES " + "('_timestamptz', NULL, NULL, 0e0, NULL, '1969-12-01 05:06:07.234 UTC', '2021-10-31 00:30:00.007 UTC'), " + "(NULL, NULL, NULL, NULL, 4e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR " + tableName)) .skippingTypesCheck() .matches("VALUES " + @@ -799,76 +799,78 @@ public void testCreatePartitionedTable() .matches(nullValues); // SHOW STATS - if (format == ORC) { - assertQuery("SHOW STATS FOR test_partitioned_table", - "VALUES " + - " ('a_boolean', NULL, NULL, 0.5, NULL, 'true', 'true'), " + - " ('an_integer', NULL, NULL, 0.5, NULL, '1', '1'), " + - " ('a_bigint', NULL, NULL, 0.5, NULL, '1', '1'), " + - " ('a_real', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, NULL, 0.5, NULL, '11.0', '11.0'), " + - " ('a_varchar', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_date', NULL, NULL, 0.5, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, NULL, 0.5, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + - " ('a_timestamptz', NULL, NULL, 0.5, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " ('a quoted, field', NULL, NULL, 0.5, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); - } - if (format == PARQUET) { - assertThat(query("SHOW STATS FOR test_partitioned_table")) - .skippingTypesCheck() - .matches("VALUES " + - " ('a_boolean', NULL, NULL, 0.5e0, NULL, 'true', 'true'), " + - " ('an_integer', NULL, NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_bigint', NULL, NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_real', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " + - " ('a_varchar', 87e0, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_varbinary', 82e0, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + - " ('a_timestamptz', NULL, NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('a quoted, field', 83e0, NULL, 0.5e0, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); - } - else if (format == AVRO) { - assertThat(query("SHOW STATS FOR test_partitioned_table")) - .skippingTypesCheck() - .matches("VALUES " + - " ('a_boolean', NULL, NULL, 0.5e0, NULL, 'true', 'true'), " + - " ('an_integer', NULL, NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_bigint', NULL, NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_real', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " + - " ('a_varchar', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + - " ('a_timestamptz', NULL, NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + - " ('a quoted, field', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + switch (format) { + case ORC -> { + assertQuery("SHOW STATS FOR test_partitioned_table", + "VALUES " + + " ('a_boolean', NULL, NULL, 0.5, NULL, 'true', 'true'), " + + " ('an_integer', NULL, NULL, 0.5, NULL, '1', '1'), " + + " ('a_bigint', NULL, NULL, 0.5, NULL, '1', '1'), " + + " ('a_real', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, NULL, 0.5, NULL, '11.0', '11.0'), " + + " ('a_varchar', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_varbinary', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_date', NULL, NULL, 0.5, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, NULL, 0.5, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + + " ('a_timestamptz', NULL, NULL, 0.5, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a quoted, field', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } + case PARQUET -> { + assertThat(query("SHOW STATS FOR test_partitioned_table")) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, NULL, 0.5e0, NULL, 'true', 'true'), " + + " ('an_integer', NULL, NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_bigint', NULL, NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_real', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " + + " ('a_varchar', 87e0, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_varbinary', 82e0, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + + " ('a_timestamptz', NULL, NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a quoted, field', 83e0, NULL, 0.5e0, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } + case AVRO -> { + assertThat(query("SHOW STATS FOR test_partitioned_table")) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, NULL, 0.5e0, NULL, 'true', 'true'), " + + " ('an_integer', NULL, NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_bigint', NULL, NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_real', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, NULL, 0.5e0, NULL, '11.0', '11.0'), " + + " ('a_varchar', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_varbinary', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_date', NULL, NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + + " ('a_timestamptz', NULL, NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a quoted, field', NULL, NULL, 0.5e0, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } } // $partitions @@ -1160,7 +1162,7 @@ public void testShowStatsAfterAddColumn() assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (NULL, NULL, NULL)", 1); assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (7, 8, 9)", 1); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) .skippingTypesCheck() .matches("VALUES " + @@ -1169,7 +1171,7 @@ public void testShowStatsAfterAddColumn() " ('col2', NULL, NULL, 25e-2, NULL, '3', '9'), " + " (NULL, NULL, NULL, NULL, 4e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) .skippingTypesCheck() .matches("VALUES " + @@ -1182,7 +1184,7 @@ else if (format == AVRO) { // Columns added after some data files exist will not have valid statistics because not all files have min/max/null count statistics for the new column assertUpdate("ALTER TABLE test_show_stats_after_add_column ADD COLUMN col3 INTEGER"); assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (10, 11, 12, 13)", 1); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) .skippingTypesCheck() .matches("VALUES " + @@ -1192,7 +1194,7 @@ else if (format == AVRO) { " ('col3', NULL, NULL, NULL, NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 5e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) .skippingTypesCheck() .matches("VALUES " + @@ -1917,7 +1919,7 @@ public void testMonthTransformDate() String expectedDateStats = "NULL, NULL, 0.0666667e0, NULL, '1969-11-13', '2020-12-31'"; String expectedBigIntStats = "NULL, NULL, 0e0, NULL, '1', '101'"; - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertQuery( "SELECT partition.d_month, record_count, data.d.min, data.d.max, data.b.min, data.b.max FROM \"test_month_transform_date$partitions\"", "VALUES " + @@ -1932,7 +1934,7 @@ public void testMonthTransformDate() "(606, 2, DATE '2020-07-18', DATE '2020-07-28', 12, 13), " + "(611, 1, DATE '2020-12-31', DATE '2020-12-31', 14, 14)"); } - else if (format == AVRO) { + else { assertQuery( "SELECT partition.d_month, record_count, data.d.min, data.d.max, data.b.min, data.b.max FROM \"test_month_transform_date$partitions\"", "VALUES " + @@ -1992,7 +1994,7 @@ else if (format == AVRO) { assertThat(query("SELECT * FROM test_month_transform_date WHERE date_trunc('year', d) = DATE '2015-01-01'")) .isFullyPushedDown(); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_month_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2000,7 +2002,7 @@ else if (format == AVRO) { " ('b', NULL, NULL, 0e0, NULL, '1', '101'), " + " (NULL, NULL, NULL, NULL, 15e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_month_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2250,7 +2252,7 @@ public void testYearTransformDate() assertUpdate("INSERT INTO test_year_transform_date " + values, 13); assertQuery("SELECT * FROM test_year_transform_date", values); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertQuery( "SELECT partition.d_year, record_count, data.d.min, data.d.max, data.b.min, data.b.max FROM \"test_year_transform_date$partitions\"", "VALUES " + @@ -2262,7 +2264,7 @@ public void testYearTransformDate() "(46, 2, DATE '2016-05-15', DATE '2016-06-06', 9, 10), " + "(50, 2, DATE '2020-02-21', DATE '2020-11-10', 11, 12)"); } - else if (format == AVRO) { + else { assertQuery( "SELECT partition.d_year, record_count, data.d.min, data.d.max, data.b.min, data.b.max FROM \"test_year_transform_date$partitions\"", "VALUES " + @@ -2280,7 +2282,7 @@ else if (format == AVRO) { "SELECT * FROM test_year_transform_date WHERE day_of_week(d) = 1 AND b % 7 = 3", "VALUES (DATE '2016-06-06', 10)"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_year_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2288,7 +2290,7 @@ else if (format == AVRO) { " ('b', NULL, NULL, 0e0, NULL, '1', '101'), " + " (NULL, NULL, NULL, NULL, 13e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_year_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2325,7 +2327,7 @@ else if (format == AVRO) { assertThat(query("SELECT * FROM test_year_transform_date WHERE date_trunc('year', d) = DATE '2015-01-01'")) .isFullyPushedDown(); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_year_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2333,7 +2335,7 @@ else if (format == AVRO) { " ('b', NULL, NULL, 0e0, NULL, '1', '101'), " + " (NULL, NULL, NULL, NULL, 13e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_year_transform_date")) .skippingTypesCheck() .matches("VALUES " + @@ -2678,7 +2680,7 @@ public void testTruncateIntegerTransform(String dataType) "SELECT * FROM " + table + " WHERE d % 10 = -1 AND b % 7 = 3", "VALUES (-1, 10)"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR " + table)) .skippingTypesCheck() .matches("VALUES " + @@ -2686,7 +2688,7 @@ public void testTruncateIntegerTransform(String dataType) " ('b', NULL, NULL, 0e0, NULL, '1', '101'), " + " (NULL, NULL, NULL, NULL, 16e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR " + table)) .skippingTypesCheck() .matches("VALUES " + @@ -2917,12 +2919,12 @@ public void testVoidTransform() assertQuery("SELECT * FROM test_void_transform", values); assertQuery("SELECT COUNT(*) FROM \"test_void_transform$partitions\"", "SELECT 1"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertQuery( "SELECT partition.d_null, record_count, file_count, data.d.min, data.d.max, data.d.null_count, data.d.nan_count, data.b.min, data.b.max, data.b.null_count, data.b.nan_count FROM \"test_void_transform$partitions\"", "VALUES (NULL, 7, 1, 'Warsaw', 'mommy', 2, NULL, 1, 7, 0, NULL)"); } - else if (format == AVRO) { + else { assertQuery( "SELECT partition.d_null, record_count, file_count, data.d.min, data.d.max, data.d.null_count, data.d.nan_count, data.b.min, data.b.max, data.b.null_count, data.b.nan_count FROM \"test_void_transform$partitions\"", "VALUES (NULL, 7, 1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)"); @@ -2939,7 +2941,7 @@ else if (format == AVRO) { assertQuery("SELECT b FROM test_void_transform WHERE d IS NULL", "VALUES 6, 7"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_void_transform")) .skippingTypesCheck() .matches("VALUES " + @@ -2947,7 +2949,7 @@ else if (format == AVRO) { " ('b', NULL, NULL, 0e0, NULL, '1', '7'), " + " (NULL, NULL, NULL, NULL, 7e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_void_transform")) .skippingTypesCheck() .matches("VALUES " + @@ -3239,12 +3241,12 @@ public void testPartitionedTableStatistics() MaterializedRow row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row0.getField(3), 0.0); assertEquals(row0.getField(5), "-10.0"); assertEquals(row0.getField(6), "100.0"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3252,12 +3254,12 @@ else if (format == AVRO) { MaterializedRow row1 = result.getMaterializedRows().get(1); assertEquals(row1.getField(0), "col2"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row1.getField(3), 0.0); assertEquals(row1.getField(5), "-1"); assertEquals(row1.getField(6), "10"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3278,12 +3280,12 @@ else if (format == AVRO) { assertEquals(result.getRowCount(), 3); row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row0.getField(3), 5.0 / 12.0); assertEquals(row0.getField(5), "-10.0"); assertEquals(row0.getField(6), "105.0"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3291,12 +3293,12 @@ else if (format == AVRO) { row1 = result.getMaterializedRows().get(1); assertEquals(row1.getField(0), "col2"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row1.getField(3), 0.0); assertEquals(row1.getField(5), "-1"); assertEquals(row1.getField(6), "10"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3312,12 +3314,12 @@ else if (format == AVRO) { result = computeActual("SHOW STATS FOR iceberg.tpch.test_partitioned_table_statistics"); row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row0.getField(3), 5.0 / 17.0); assertEquals(row0.getField(5), "-10.0"); assertEquals(row0.getField(6), "105.0"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3325,12 +3327,12 @@ else if (format == AVRO) { row1 = result.getMaterializedRows().get(1); assertEquals(row1.getField(0), "col2"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertEquals(row1.getField(3), 5.0 / 17.0); assertEquals(row1.getField(5), "-1"); assertEquals(row1.getField(6), "10"); } - else if (format == AVRO) { + else { assertNull(row0.getField(3)); assertNull(row0.getField(5)); assertNull(row0.getField(6)); @@ -3432,7 +3434,7 @@ public void testPartitionsTableWithColumnNameConflict(boolean partitioned) .matches("VALUES (11, 12, 13, 14, 15)"); // test $partitions - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SELECT * FROM \"test_partitions_with_conflict$partitions\"")) .matches("SELECT " + (partitioned ? "CAST(ROW(11) AS row(p integer)), " : "") + @@ -3457,7 +3459,7 @@ public void testPartitionsTableWithColumnNameConflict(boolean partitioned) " )" + ")"); } - else if (format == AVRO) { + else { assertThat(query("SELECT * FROM \"test_partitions_with_conflict$partitions\"")) .matches("SELECT " + (partitioned ? "CAST(ROW(11) AS row(p integer)), " : "") + @@ -3548,7 +3550,7 @@ public void testCreateNestedPartitionedTable() 1); assertEquals(computeActual("SELECT * from test_nested_table_1").getRowCount(), 1); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_nested_table_1")) .skippingTypesCheck() .matches("VALUES " + @@ -3568,7 +3570,7 @@ public void testCreateNestedPartitionedTable() " ('dt', NULL, NULL, 0e0, NULL, '2021-07-24', '2021-07-24'), " + " (NULL, NULL, NULL, NULL, 1e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_nested_table_1")) .skippingTypesCheck() .matches("VALUES " + @@ -3612,7 +3614,7 @@ else if (format == AVRO) { 1); assertEquals(computeActual("SELECT * from test_nested_table_2").getRowCount(), 1); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_nested_table_2")) .skippingTypesCheck() .matches("VALUES " + @@ -3627,7 +3629,7 @@ else if (format == AVRO) { " ('str', NULL, NULL, " + (format == ORC ? "0e0" : "NULL") + ", NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 1e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_nested_table_2")) .skippingTypesCheck() .matches("VALUES " + @@ -3924,7 +3926,7 @@ public void testAllAvailableTypes() .matches(nullValues); // SHOW STATS - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SHOW STATS FOR test_all_types")) .skippingTypesCheck() .matches("VALUES " + @@ -3947,7 +3949,7 @@ public void testAllAvailableTypes() " ('a_map', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); } - else if (format == AVRO) { + else { assertThat(query("SHOW STATS FOR test_all_types")) .skippingTypesCheck() .matches("VALUES " + @@ -3976,7 +3978,7 @@ else if (format == AVRO) { assertThat(query("SELECT column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = 'test_all_types$partitions' ")) .skippingTypesCheck() .matches("VALUES 'record_count', 'file_count', 'total_size', 'data'"); - if (format == ORC || format == PARQUET) { + if (format != AVRO) { assertThat(query("SELECT " + " record_count," + " file_count, " + @@ -4024,7 +4026,7 @@ else if (format == AVRO) { ) + ")"); } - else if (format == AVRO) { + else { assertThat(query("SELECT " + " record_count," + " file_count, " + From 3302ddb608e458705f126ea28ee77527ab75fe9a Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 27 Sep 2022 09:23:14 +0200 Subject: [PATCH 03/11] Give test method a better name The test is analyzing _a table_ to calculate statistics. --- .../java/io/trino/faulttolerant/BaseFailureRecoveryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java index 059f558d148e..9e5d222551f0 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java @@ -311,7 +311,7 @@ public void testUpdateWithSubquery() } @Test(invocationCount = INVOCATION_COUNT) - public void testAnalyzeStatistics() + public void testAnalyzeTable() { testNonSelect( Optional.empty(), From 3dc168d57d4efea7dc094eb15f6b1221487b5a12 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 27 Sep 2022 10:08:45 +0200 Subject: [PATCH 04/11] Capture column names in MaterializedResult --- .../java/io/trino/testing/MaterializedResult.java | 12 +++++++++++- .../java/io/trino/sql/query/QueryAssertions.java | 2 ++ .../io/trino/testing/AbstractTestingTrinoClient.java | 7 +++++++ .../java/io/trino/testing/TestingTrinoClient.java | 3 +++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/testing/MaterializedResult.java b/core/trino-main/src/main/java/io/trino/testing/MaterializedResult.java index 4fd7474add90..06c420a23dfa 100644 --- a/core/trino-main/src/main/java/io/trino/testing/MaterializedResult.java +++ b/core/trino-main/src/main/java/io/trino/testing/MaterializedResult.java @@ -91,6 +91,7 @@ public class MaterializedResult private final List rows; private final List types; + private final List columnNames; private final Map setSessionProperties; private final Set resetSessionProperties; private final Optional updateType; @@ -100,12 +101,13 @@ public class MaterializedResult public MaterializedResult(List rows, List types) { - this(rows, types, ImmutableMap.of(), ImmutableSet.of(), Optional.empty(), OptionalLong.empty(), ImmutableList.of(), Optional.empty()); + this(rows, types, ImmutableList.of(), ImmutableMap.of(), ImmutableSet.of(), Optional.empty(), OptionalLong.empty(), ImmutableList.of(), Optional.empty()); } public MaterializedResult( List rows, List types, + List columnNames, Map setSessionProperties, Set resetSessionProperties, Optional updateType, @@ -115,6 +117,7 @@ public MaterializedResult( { this.rows = ImmutableList.copyOf(requireNonNull(rows, "rows is null")); this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); + this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null")); this.setSessionProperties = ImmutableMap.copyOf(requireNonNull(setSessionProperties, "setSessionProperties is null")); this.resetSessionProperties = ImmutableSet.copyOf(requireNonNull(resetSessionProperties, "resetSessionProperties is null")); this.updateType = requireNonNull(updateType, "updateType is null"); @@ -144,6 +147,12 @@ public List getTypes() return types; } + public List getColumnNames() + { + checkState(!columnNames.isEmpty(), "Column names are unknown"); + return columnNames; + } + public Map getSetSessionProperties() { return setSessionProperties; @@ -362,6 +371,7 @@ public MaterializedResult toTestTypes() .map(MaterializedResult::convertToTestTypes) .collect(toImmutableList()), types, + columnNames, setSessionProperties, resetSessionProperties, updateType, diff --git a/core/trino-main/src/test/java/io/trino/sql/query/QueryAssertions.java b/core/trino-main/src/test/java/io/trino/sql/query/QueryAssertions.java index 8b518bcc4051..a2eeb10d9bc2 100644 --- a/core/trino-main/src/test/java/io/trino/sql/query/QueryAssertions.java +++ b/core/trino-main/src/test/java/io/trino/sql/query/QueryAssertions.java @@ -313,6 +313,8 @@ private QueryAssert( this.skipResultsCorrectnessCheckForPushdown = skipResultsCorrectnessCheckForPushdown; } + // TODO for better readability, replace this with `exceptColumns(String... columnNamesToExclude)` leveraging MaterializedResult.getColumnNames + @Deprecated public QueryAssert projected(int... columns) { return new QueryAssert( diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java index 64203abe6c73..84db05e97050 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java @@ -220,4 +220,11 @@ protected List getTypes(List columns) .map(trinoServer.getTypeManager()::fromSqlType) .collect(toImmutableList()); } + + protected List getNames(List columns) + { + return columns.stream() + .map(Column::getName) + .collect(toImmutableList()); + } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/TestingTrinoClient.java b/testing/trino-testing/src/main/java/io/trino/testing/TestingTrinoClient.java index 833b3380bc5b..5fec102e39dc 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/TestingTrinoClient.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/TestingTrinoClient.java @@ -123,6 +123,7 @@ private class MaterializedResultSession private final ImmutableList.Builder rows = ImmutableList.builder(); private final AtomicReference> types = new AtomicReference<>(); + private final AtomicReference> columnNames = new AtomicReference<>(); private final AtomicReference> updateType = new AtomicReference<>(Optional.empty()); private final AtomicReference updateCount = new AtomicReference<>(OptionalLong.empty()); @@ -158,6 +159,7 @@ public void addResults(QueryStatusInfo statusInfo, QueryData data) { if (types.get() == null && statusInfo.getColumns() != null) { types.set(getTypes(statusInfo.getColumns())); + columnNames.set(getNames(statusInfo.getColumns())); } if (data.getData() != null) { @@ -173,6 +175,7 @@ public MaterializedResult build(Map setSessionProperties, Set Date: Tue, 27 Sep 2022 10:08:58 +0200 Subject: [PATCH 05/11] Allow approximations when testing ANALYZE with fault-tolerance ANALYZE can compute data size and number distinct values (NDV) using approximations and the result is not guaranteed to be stable. When testing ANALYZE with fault tolerance, allow the results to be differ between test run and the control run by some small percentage. --- .../BaseFailureRecoveryTest.java | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java index 9e5d222551f0..61873ce7286f 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFailureRecoveryTest.java @@ -30,6 +30,7 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedResultWithQueryId; +import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; import org.assertj.core.api.AbstractThrowableAssert; @@ -48,6 +49,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; +import static com.google.common.base.Functions.identity; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; @@ -76,8 +78,10 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.data.Percentage.withPercentage; import static org.testng.Assert.assertEquals; public abstract class BaseFailureRecoveryTest @@ -644,9 +648,41 @@ public void finishesSuccessfully(Consumer queryAssertion, boolean expec assertThat(expected.getUpdatedTableStatistics()).isPresent(); assertThat(actual.getUpdatedTableStatistics()).isPresent(); - MaterializedResult expectedUpdatedTableStatistics = expected.getUpdatedTableStatistics().get(); - MaterializedResult actualUpdatedTableStatistics = actual.getUpdatedTableStatistics().get(); - assertEqualsIgnoreOrder(actualUpdatedTableStatistics, expectedUpdatedTableStatistics, "For query: \n " + query); + MaterializedResult expectedUpdatedTableStatisticsResult = expected.getUpdatedTableStatistics().get(); + MaterializedResult actualUpdatedTableStatisticsResult = actual.getUpdatedTableStatistics().get(); + assertEquals(actualUpdatedTableStatisticsResult.getTypes(), expectedUpdatedTableStatisticsResult.getTypes(), "Column types"); + assertEquals(actualUpdatedTableStatisticsResult.getColumnNames(), expectedUpdatedTableStatisticsResult.getColumnNames(), "Column names"); + Map expectedUpdatedTableStatistics = expectedUpdatedTableStatisticsResult.getMaterializedRows().stream() + .collect(toMap(row -> (String) row.getField(0), identity())); + Map actualUpdatedTableStatistics = actualUpdatedTableStatisticsResult.getMaterializedRows().stream() + .collect(toMap(row -> (String) row.getField(0), identity())); + assertEquals(actualUpdatedTableStatistics.keySet(), expectedUpdatedTableStatistics.keySet(), "Table columns"); + expectedUpdatedTableStatistics.forEach((key, expectedRow) -> { + MaterializedRow actualRow = actualUpdatedTableStatistics.get(key); + assertEquals(actualRow.getFieldCount(), expectedRow.getFieldCount(), "Unexpected layout of stats"); + for (int statsColumnIndex = 0; statsColumnIndex < expectedRow.getFieldCount(); statsColumnIndex++) { + String statsColumnName = actualUpdatedTableStatisticsResult.getColumnNames().get(statsColumnIndex); + String testedFieldDescription = "Field %d '%s' in %s".formatted(statsColumnIndex, statsColumnName, actualRow); + Object expectedValue = expectedRow.getField(statsColumnIndex); + Object actualValue = actualRow.getField(statsColumnIndex); + if (expectedValue == null) { + assertThat(actualValue).as(testedFieldDescription) + .isNull(); + } + else { + switch (statsColumnName) { + case "data_size", "distinct_values_count" -> { + assertThat((double) actualValue).as(testedFieldDescription) + .isCloseTo((double) expectedValue, withPercentage(5)); + } + default -> { + assertThat(actualValue).as(testedFieldDescription) + .isEqualTo(expectedValue); + } + } + } + } + }); } else if (isUpdate) { assertEquals(actualQueryResult.getUpdateCount(), expectedQueryResult.getUpdateCount()); From a86625cfe248f972739f4f6945d7d8aaf4c6e0b7 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 27 Sep 2022 08:57:34 +0200 Subject: [PATCH 06/11] Test Iceberg ANALYZE with all supported types --- .../iceberg/BaseIcebergConnectorTest.java | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 642354bf33bd..7b5cca976aba 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -3973,6 +3973,60 @@ public void testAllAvailableTypes() " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); } + // ANALYZE + Session defaultSession = getSession(); + String catalog = defaultSession.getCatalog().orElseThrow(); + Session extendedStatisticsEnabled = Session.builder(defaultSession) + .setCatalogSessionProperty(catalog, EXTENDED_STATISTICS_ENABLED, "true") + .build(); + assertUpdate(extendedStatisticsEnabled, "ANALYZE test_all_types"); + if (format != AVRO) { + assertThat(query(extendedStatisticsEnabled, "SHOW STATS FOR test_all_types")) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, 1e0, 0.5e0, NULL, 'true', 'true'), " + + " ('an_integer', NULL, 1e0, 0.5e0, NULL, '1', '1'), " + + " ('a_bigint', NULL, 1e0, 0.5e0, NULL, '1', '1'), " + + " ('a_real', NULL, 1e0, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, 1e0, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, 1e0, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, 1e0, 0.5e0, NULL, '11.0', '11.0'), " + + " ('a_varchar', " + (format == PARQUET ? "87e0" : "NULL") + ", 1e0, 0.5e0, NULL, NULL, NULL), " + + " ('a_varbinary', " + (format == PARQUET ? "82e0" : "NULL") + ", 1e0, 0.5e0, NULL, NULL, NULL), " + + " ('a_date', NULL, 1e0, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, 1e0, 0.5e0, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, 1e0, 0.5e0, NULL, " + (format == ORC ? "'2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'" : "'2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'") + "), " + + " ('a_timestamptz', NULL, 1e0, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, 1e0, 0.5e0, NULL, NULL, NULL), " + + " ('a_row', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('an_array', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('a_map', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } + else { + assertThat(query(extendedStatisticsEnabled, "SHOW STATS FOR test_all_types")) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('an_integer', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_bigint', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_real', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_double', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_short_decimal', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_long_decimal', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_varchar', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_varbinary', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_date', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_time', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_timestamptz', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_uuid', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_row', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('an_array', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_map', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); + } + // $partitions String schema = getSession().getSchema().orElseThrow(); assertThat(query("SELECT column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = 'test_all_types$partitions' ")) From 4b958b1501914906f9a8225230d53b1e76cfe172 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 29 Sep 2022 14:04:16 +0200 Subject: [PATCH 07/11] Improve Iceberg to Trino conversion code - use applicable static imports - cast to primitive directly (same behavior, but looks nicer) --- .../io/trino/plugin/iceberg/IcebergTypes.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java index 81bea4d0a70c..fc210b508add 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java @@ -16,7 +16,6 @@ import io.airlift.slice.Slices; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Decimals; -import io.trino.spi.type.UuidType; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -28,6 +27,9 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; +import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid; +import static java.lang.Float.floatToIntBits; +import static java.lang.Math.multiplyExact; public final class IcebergTypes { @@ -43,21 +45,21 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value) } if (icebergType instanceof Types.BooleanType) { //noinspection RedundantCast - return (Boolean) value; + return (boolean) value; } if (icebergType instanceof Types.IntegerType) { - return ((Integer) value).longValue(); + return (long) (int) value; } if (icebergType instanceof Types.LongType) { //noinspection RedundantCast - return (Long) value; + return (long) value; } if (icebergType instanceof Types.FloatType) { - return (long) Float.floatToIntBits((Float) value); + return (long) floatToIntBits((float) value); } if (icebergType instanceof Types.DoubleType) { //noinspection RedundantCast - return (Double) value; + return (double) value; } if (icebergType instanceof Types.DecimalType icebergDecimalType) { DecimalType trinoDecimalType = DecimalType.createDecimalType(icebergDecimalType.precision(), icebergDecimalType.scale()); @@ -77,10 +79,10 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value) return Slices.wrappedBuffer(((ByteBuffer) value).array().clone()); } if (icebergType instanceof Types.DateType) { - return ((Integer) value).longValue(); + return (long) (int) value; } if (icebergType instanceof Types.TimeType) { - return Math.multiplyExact((Long) value, PICOSECONDS_PER_MICROSECOND); + return multiplyExact((long) value, PICOSECONDS_PER_MICROSECOND); } if (icebergType instanceof Types.TimestampType icebergTimestampType) { long epochMicros = (long) value; @@ -90,7 +92,7 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value) return epochMicros; } if (icebergType instanceof Types.UUIDType) { - return UuidType.javaUuidToTrinoUuid((UUID) value); + return javaUuidToTrinoUuid((UUID) value); } throw new UnsupportedOperationException("Unsupported iceberg type: " + icebergType); From 6726950fa4b9c5ba96a196951d6879a470bc6757 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 29 Sep 2022 13:50:03 +0200 Subject: [PATCH 08/11] Move Trino to Iceberg value conversion utility method Move `ExpressionConverter.getIcebergLiteralValue` to `IcebergTypes` for re-use. Name it `convertTrinoValueToIceberg` to be consistent with existing `convertIcebergValueToTrino` method in that class. --- .../plugin/iceberg/ExpressionConverter.java | 95 +------------------ .../io/trino/plugin/iceberg/IcebergTypes.java | 87 +++++++++++++++++ 2 files changed, 92 insertions(+), 90 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java index 205ce2183afa..ddb5e6555eed 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java @@ -14,31 +14,16 @@ package io.trino.plugin.iceberg; import com.google.common.base.VerifyException; -import io.airlift.slice.Slice; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.ArrayType; -import io.trino.spi.type.BigintType; -import io.trino.spi.type.BooleanType; -import io.trino.spi.type.DateType; -import io.trino.spi.type.DecimalType; -import io.trino.spi.type.DoubleType; -import io.trino.spi.type.Int128; -import io.trino.spi.type.IntegerType; -import io.trino.spi.type.LongTimestampWithTimeZone; import io.trino.spi.type.MapType; -import io.trino.spi.type.RealType; import io.trino.spi.type.RowType; import io.trino.spi.type.Type; -import io.trino.spi.type.UuidType; -import io.trino.spi.type.VarbinaryType; -import io.trino.spi.type.VarcharType; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import java.math.BigDecimal; -import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -48,16 +33,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; -import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; -import static io.trino.spi.type.TimeType.TIME_MICROS; -import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; -import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; -import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; -import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid; -import static java.lang.Float.intBitsToFloat; -import static java.lang.Math.toIntExact; +import static io.trino.plugin.iceberg.IcebergTypes.convertTrinoValueToIceberg; import static java.lang.String.format; -import static java.util.Objects.requireNonNull; import static org.apache.iceberg.expressions.Expressions.alwaysFalse; import static org.apache.iceberg.expressions.Expressions.alwaysTrue; import static org.apache.iceberg.expressions.Expressions.equal; @@ -117,7 +94,7 @@ private static Expression toIcebergExpression(String columnName, Type type, Doma List rangeExpressions = new ArrayList<>(); for (Range range : orderedRanges) { if (range.isSingleValue()) { - icebergValues.add(getIcebergLiteralValue(type, range.getLowBoundedValue())); + icebergValues.add(convertTrinoValueToIceberg(type, range.getLowBoundedValue())); } else { rangeExpressions.add(toIcebergExpression(columnName, range)); @@ -137,13 +114,13 @@ private static Expression toIcebergExpression(String columnName, Range range) Type type = range.getType(); if (range.isSingleValue()) { - Object icebergValue = getIcebergLiteralValue(type, range.getSingleValue()); + Object icebergValue = convertTrinoValueToIceberg(type, range.getSingleValue()); return equal(columnName, icebergValue); } List conjuncts = new ArrayList<>(2); if (!range.isLowUnbounded()) { - Object icebergLow = getIcebergLiteralValue(type, range.getLowBoundedValue()); + Object icebergLow = convertTrinoValueToIceberg(type, range.getLowBoundedValue()); Expression lowBound; if (range.isLowInclusive()) { lowBound = greaterThanOrEqual(columnName, icebergLow); @@ -155,7 +132,7 @@ private static Expression toIcebergExpression(String columnName, Range range) } if (!range.isHighUnbounded()) { - Object icebergHigh = getIcebergLiteralValue(type, range.getHighBoundedValue()); + Object icebergHigh = convertTrinoValueToIceberg(type, range.getHighBoundedValue()); Expression highBound; if (range.isHighInclusive()) { highBound = lessThanOrEqual(columnName, icebergHigh); @@ -169,68 +146,6 @@ private static Expression toIcebergExpression(String columnName, Range range) return and(conjuncts); } - private static Object getIcebergLiteralValue(Type type, Object trinoNativeValue) - { - requireNonNull(trinoNativeValue, "trinoNativeValue is null"); - - if (type instanceof BooleanType) { - return (boolean) trinoNativeValue; - } - - if (type instanceof IntegerType) { - return toIntExact((long) trinoNativeValue); - } - - if (type instanceof BigintType) { - return (long) trinoNativeValue; - } - - if (type instanceof RealType) { - return intBitsToFloat(toIntExact((long) trinoNativeValue)); - } - - if (type instanceof DoubleType) { - return (double) trinoNativeValue; - } - - if (type instanceof DateType) { - return toIntExact(((Long) trinoNativeValue)); - } - - if (type.equals(TIME_MICROS)) { - return ((long) trinoNativeValue) / PICOSECONDS_PER_MICROSECOND; - } - - if (type.equals(TIMESTAMP_MICROS)) { - return (long) trinoNativeValue; - } - - if (type.equals(TIMESTAMP_TZ_MICROS)) { - return timestampTzToMicros((LongTimestampWithTimeZone) trinoNativeValue); - } - - if (type instanceof VarcharType) { - return ((Slice) trinoNativeValue).toStringUtf8(); - } - - if (type instanceof VarbinaryType) { - return ByteBuffer.wrap(((Slice) trinoNativeValue).getBytes()); - } - - if (type instanceof UuidType) { - return trinoUuidToJavaUuid(((Slice) trinoNativeValue)); - } - - if (type instanceof DecimalType decimalType) { - if (decimalType.isShort()) { - return BigDecimal.valueOf((long) trinoNativeValue).movePointLeft(decimalType.getScale()); - } - return new BigDecimal(((Int128) trinoNativeValue).toBigInteger(), decimalType.getScale()); - } - - throw new UnsupportedOperationException("Unsupported type: " + type); - } - private static Expression and(List expressions) { if (expressions.isEmpty()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java index fc210b508add..e5edf8015d50 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java @@ -13,9 +13,21 @@ */ package io.trino.plugin.iceberg; +import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; +import io.trino.spi.type.DateType; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Decimals; +import io.trino.spi.type.DoubleType; +import io.trino.spi.type.Int128; +import io.trino.spi.type.IntegerType; +import io.trino.spi.type.LongTimestampWithTimeZone; +import io.trino.spi.type.RealType; +import io.trino.spi.type.UuidType; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -26,15 +38,90 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; +import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid; +import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid; import static java.lang.Float.floatToIntBits; +import static java.lang.Float.intBitsToFloat; import static java.lang.Math.multiplyExact; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; public final class IcebergTypes { private IcebergTypes() {} + /** + * Convert value from Trino representation to Iceberg representation. + * + * @apiNote This accepts a Trino type because, currently, no two Iceberg types translate to one Trino type. + */ + public static Object convertTrinoValueToIceberg(io.trino.spi.type.Type type, Object trinoNativeValue) + { + requireNonNull(trinoNativeValue, "trinoNativeValue is null"); + + if (type instanceof BooleanType) { + return (boolean) trinoNativeValue; + } + + if (type instanceof IntegerType) { + return toIntExact((long) trinoNativeValue); + } + + if (type instanceof BigintType) { + return (long) trinoNativeValue; + } + + if (type instanceof RealType) { + return intBitsToFloat(toIntExact((long) trinoNativeValue)); + } + + if (type instanceof DoubleType) { + return (double) trinoNativeValue; + } + + if (type instanceof DateType) { + return toIntExact(((Long) trinoNativeValue)); + } + + if (type.equals(TIME_MICROS)) { + return ((long) trinoNativeValue) / PICOSECONDS_PER_MICROSECOND; + } + + if (type.equals(TIMESTAMP_MICROS)) { + return (long) trinoNativeValue; + } + + if (type.equals(TIMESTAMP_TZ_MICROS)) { + return timestampTzToMicros((LongTimestampWithTimeZone) trinoNativeValue); + } + + if (type instanceof VarcharType) { + return ((Slice) trinoNativeValue).toStringUtf8(); + } + + if (type instanceof VarbinaryType) { + return ByteBuffer.wrap(((Slice) trinoNativeValue).getBytes()); + } + + if (type instanceof UuidType) { + return trinoUuidToJavaUuid(((Slice) trinoNativeValue)); + } + + if (type instanceof DecimalType decimalType) { + if (decimalType.isShort()) { + return BigDecimal.valueOf((long) trinoNativeValue).movePointLeft(decimalType.getScale()); + } + return new BigDecimal(((Int128) trinoNativeValue).toBigInteger(), decimalType.getScale()); + } + + throw new UnsupportedOperationException("Unsupported type: " + type); + } + /** * Convert value from Iceberg representation to Trino representation. */ From f84b6ecccd021d7baa337901cd292ff127967c9a Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 29 Sep 2022 13:58:22 +0200 Subject: [PATCH 09/11] Improve Trino to Iceberg conversion code - suppress intentional "redundant casts" - exact division - cast to primitive directly (looks nicer, same meaning) - remove redundant `instanceof` - put decimals together with other numbers --- .../io/trino/plugin/iceberg/IcebergTypes.java | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java index e5edf8015d50..7b8ef16b6a86 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java @@ -13,18 +13,13 @@ */ package io.trino.plugin.iceberg; +import com.google.common.math.LongMath; import io.airlift.slice.Slice; import io.airlift.slice.Slices; -import io.trino.spi.type.BigintType; -import io.trino.spi.type.BooleanType; -import io.trino.spi.type.DateType; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Decimals; -import io.trino.spi.type.DoubleType; import io.trino.spi.type.Int128; -import io.trino.spi.type.IntegerType; import io.trino.spi.type.LongTimestampWithTimeZone; -import io.trino.spi.type.RealType; import io.trino.spi.type.UuidType; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; @@ -39,6 +34,12 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.TimeType.TIME_MICROS; import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; @@ -49,6 +50,7 @@ import static java.lang.Float.intBitsToFloat; import static java.lang.Math.multiplyExact; import static java.lang.Math.toIntExact; +import static java.math.RoundingMode.UNNECESSARY; import static java.util.Objects.requireNonNull; public final class IcebergTypes @@ -64,35 +66,46 @@ public static Object convertTrinoValueToIceberg(io.trino.spi.type.Type type, Obj { requireNonNull(trinoNativeValue, "trinoNativeValue is null"); - if (type instanceof BooleanType) { + if (type == BOOLEAN) { + //noinspection RedundantCast return (boolean) trinoNativeValue; } - if (type instanceof IntegerType) { + if (type == INTEGER) { return toIntExact((long) trinoNativeValue); } - if (type instanceof BigintType) { + if (type == BIGINT) { + //noinspection RedundantCast return (long) trinoNativeValue; } - if (type instanceof RealType) { + if (type == REAL) { return intBitsToFloat(toIntExact((long) trinoNativeValue)); } - if (type instanceof DoubleType) { + if (type == DOUBLE) { + //noinspection RedundantCast return (double) trinoNativeValue; } - if (type instanceof DateType) { - return toIntExact(((Long) trinoNativeValue)); + if (type instanceof DecimalType decimalType) { + if (decimalType.isShort()) { + return BigDecimal.valueOf((long) trinoNativeValue).movePointLeft(decimalType.getScale()); + } + return new BigDecimal(((Int128) trinoNativeValue).toBigInteger(), decimalType.getScale()); + } + + if (type == DATE) { + return toIntExact((long) trinoNativeValue); } if (type.equals(TIME_MICROS)) { - return ((long) trinoNativeValue) / PICOSECONDS_PER_MICROSECOND; + return LongMath.divide((long) trinoNativeValue, PICOSECONDS_PER_MICROSECOND, UNNECESSARY); } if (type.equals(TIMESTAMP_MICROS)) { + //noinspection RedundantCast return (long) trinoNativeValue; } @@ -108,17 +121,10 @@ public static Object convertTrinoValueToIceberg(io.trino.spi.type.Type type, Obj return ByteBuffer.wrap(((Slice) trinoNativeValue).getBytes()); } - if (type instanceof UuidType) { + if (type == UuidType.UUID) { return trinoUuidToJavaUuid(((Slice) trinoNativeValue)); } - if (type instanceof DecimalType decimalType) { - if (decimalType.isShort()) { - return BigDecimal.valueOf((long) trinoNativeValue).movePointLeft(decimalType.getScale()); - } - return new BigDecimal(((Int128) trinoNativeValue).toBigInteger(), decimalType.getScale()); - } - throw new UnsupportedOperationException("Unsupported type: " + type); } From 90fb61240bc8d2788e759a7b51953692aca9e1c1 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 23 Sep 2022 17:32:47 +0200 Subject: [PATCH 10/11] Calculate Iceberg NDV with a Theta sketch Iceberg specification defines the Apache DataSketches's Theta as the common data sketch for keeping track of distinct values in a table. This change replaces the use of HLL within Iceberg's ANALYZE with Theta sketch. The follow-up work is to store the serialized compact form of the sketch inside the Iceberg Puffin statistics file, but this requires Iceberg API changes, which are still in progress. A side effect of this change is that complex types (array, map, row) can no longer be analyzed: Trino can calculate a HyperLogLog for these types, while Iceberg does not specify binary representation for these types, which is required to feed data into a Theta sketch. However, NDV for complex types is not as useful as it is for scalar types, so this shouldn't matter in practice. --- plugin/trino-iceberg/pom.xml | 12 ++ .../trino/plugin/iceberg/IcebergMetadata.java | 21 +-- .../trino/plugin/iceberg/IcebergPlugin.java | 10 ++ .../iceberg/aggregation/DataSketchState.java | 32 +++++ .../DataSketchStateSerializer.java | 73 +++++++++++ .../IcebergThetaSketchForStats.java | 124 ++++++++++++++++++ .../iceberg/BaseIcebergConnectorTest.java | 12 +- 7 files changed, 269 insertions(+), 15 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 2fa12450ef3c..26f94f628b28 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -165,6 +165,18 @@ failsafe + + org.apache.datasketches + datasketches-java + 3.3.0 + + + + org.apache.datasketches + datasketches-memory + 2.1.0 + + org.apache.iceberg iceberg-api diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index f461a00b2f22..6a0af1a0c255 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -34,6 +34,8 @@ import io.trino.plugin.hive.HiveApplyProjectionUtil; import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation; import io.trino.plugin.hive.HiveWrittenPartitions; +import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer; +import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.procedure.IcebergDropExtendedStatsHandle; import io.trino.plugin.iceberg.procedure.IcebergExpireSnapshotsHandle; @@ -94,6 +96,7 @@ import io.trino.spi.type.TimestampWithTimeZoneType; import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeOperators; +import org.apache.datasketches.theta.CompactSketch; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; @@ -229,7 +232,6 @@ import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW; -import static io.trino.spi.predicate.Utils.blockToNativeValue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc; import static io.trino.spi.type.UuidType.UUID; @@ -264,7 +266,7 @@ public class IcebergMetadata public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp"; private static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES"; - private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName("approx_distinct"); + private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME); private final TypeManager typeManager; private final TypeOperators typeOperators; @@ -1468,8 +1470,9 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession } ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); - Set allDataColumnNames = tableMetadata.getColumns().stream() + Set allScalarColumnNames = tableMetadata.getColumns().stream() .filter(column -> !column.isHidden()) + .filter(column -> column.getType().getTypeParameters().isEmpty()) // is scalar type .map(ColumnMetadata::getName) .collect(toImmutableSet()); @@ -1479,18 +1482,17 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession if (columnNames.isEmpty()) { throw new TrinoException(INVALID_ANALYZE_PROPERTY, "Cannot specify empty list of columns for analysis"); } - if (!allDataColumnNames.containsAll(columnNames)) { + if (!allScalarColumnNames.containsAll(columnNames)) { throw new TrinoException( INVALID_ANALYZE_PROPERTY, - format("Invalid columns specified for analysis: %s", Sets.difference(columnNames, allDataColumnNames))); + format("Invalid columns specified for analysis: %s", Sets.difference(columnNames, allScalarColumnNames))); } return columnNames; }) - .orElse(allDataColumnNames); + .orElse(allScalarColumnNames); Set columnStatistics = tableMetadata.getColumns().stream() .filter(column -> analyzeColumnNames.contains(column.getName())) - // TODO: add support for NDV summary/sketch, but using Theta sketch, not HLL; see https://github.com/apache/iceberg-docs/pull/69 .map(column -> new ColumnStatisticMetadata(column.getName(), NUMBER_OF_DISTINCT_VALUES_NAME, NUMBER_OF_DISTINCT_VALUES_FUNCTION)) .collect(toImmutableSet()); @@ -1537,12 +1539,13 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH for (Map.Entry entry : computedStatistic.getColumnStatistics().entrySet()) { ColumnStatisticMetadata statisticMetadata = entry.getKey(); if (statisticMetadata.getConnectorAggregationId().equals(NUMBER_OF_DISTINCT_VALUES_NAME)) { - long ndv = (long) blockToNativeValue(BIGINT, entry.getValue()); Integer columnId = verifyNotNull( columnNameToId.get(statisticMetadata.getColumnName()), "Column not found in table: [%s]", statisticMetadata.getColumnName()); - updateProperties.set(TRINO_STATS_NDV_FORMAT.formatted(columnId), Long.toString(ndv)); + CompactSketch sketch = DataSketchStateSerializer.deserialize(entry.getValue(), 0); + // TODO: store whole sketch to support updates, see also https://github.com/apache/iceberg-docs/pull/69 + updateProperties.set(TRINO_STATS_NDV_FORMAT.formatted(columnId), Long.toString((long) sketch.getEstimate())); } else { throw new UnsupportedOperationException("Unsupported statistic: " + statisticMetadata); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java index 99e510ad73b1..bfdf4ce0844d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPlugin.java @@ -14,9 +14,13 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; +import java.util.Set; + public class IcebergPlugin implements Plugin { @@ -25,4 +29,10 @@ public Iterable getConnectorFactories() { return ImmutableList.of(new IcebergConnectorFactory()); } + + @Override + public Set> getFunctions() + { + return ImmutableSet.of(IcebergThetaSketchForStats.class); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java new file mode 100644 index 000000000000..1e79168d6754 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchState.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.trino.spi.function.AccumulatorState; +import io.trino.spi.function.AccumulatorStateMetadata; +import org.apache.datasketches.theta.CompactSketch; +import org.apache.datasketches.theta.UpdateSketch; + +@AccumulatorStateMetadata(stateSerializerClass = DataSketchStateSerializer.class) +public interface DataSketchState + extends AccumulatorState +{ + UpdateSketch getUpdateSketch(); + + void setUpdateSketch(UpdateSketch value); + + CompactSketch getCompactSketch(); + + void setCompactSketch(CompactSketch value); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java new file mode 100644 index 000000000000..91a5068c6b55 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/DataSketchStateSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.function.AccumulatorStateSerializer; +import io.trino.spi.type.Type; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.theta.CompactSketch; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.spi.type.VarbinaryType.VARBINARY; + +public class DataSketchStateSerializer + implements AccumulatorStateSerializer +{ + @Override + public Type getSerializedType() + { + return VARBINARY; + } + + @Override + public void serialize(DataSketchState state, BlockBuilder out) + { + serializeToVarbinary(state, out); + } + + public static void serializeToVarbinary(DataSketchState state, BlockBuilder out) + { + if (state.getUpdateSketch() == null && state.getCompactSketch() == null) { + out.appendNull(); + } + else { + checkArgument(state.getUpdateSketch() == null || state.getCompactSketch() == null, "A state must not have both transient accumulator and combined form set"); + CompactSketch compactSketch = Optional.ofNullable(state.getCompactSketch()) + .orElseGet(() -> state.getUpdateSketch().compact()); + Slice slice = Slices.wrappedBuffer(compactSketch.toByteArray()); + VARBINARY.writeSlice(out, slice); + } + } + + @Override + public void deserialize(Block block, int index, DataSketchState state) + { + if (!block.isNull(index)) { + state.setCompactSketch(deserialize(block, index)); + } + } + + public static CompactSketch deserialize(Block block, int index) + { + checkArgument(!block.isNull(index), "Value is null"); + Slice slice = VARBINARY.getSlice(block, index); + return CompactSketch.heapify(WritableMemory.writableWrap(slice.getBytes())); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java new file mode 100644 index 000000000000..518015c2687d --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/aggregation/IcebergThetaSketchForStats.java @@ -0,0 +1,124 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.aggregation; + +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.function.AggregationFunction; +import io.trino.spi.function.AggregationState; +import io.trino.spi.function.BlockIndex; +import io.trino.spi.function.BlockPosition; +import io.trino.spi.function.CombineFunction; +import io.trino.spi.function.InputFunction; +import io.trino.spi.function.OutputFunction; +import io.trino.spi.function.SqlType; +import io.trino.spi.function.TypeParameter; +import io.trino.spi.type.StandardTypes; +import io.trino.spi.type.Type; +import org.apache.datasketches.Family; +import org.apache.datasketches.theta.SetOperation; +import org.apache.datasketches.theta.Sketch; +import org.apache.datasketches.theta.Union; +import org.apache.datasketches.theta.UpdateSketch; +import org.apache.iceberg.types.Conversions; + +import javax.annotation.Nullable; + +import java.nio.ByteBuffer; + +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.iceberg.IcebergTypes.convertTrinoValueToIceberg; +import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; +import static io.trino.spi.type.TypeUtils.readNativeValue; +import static java.util.Objects.requireNonNull; + +@AggregationFunction(value = IcebergThetaSketchForStats.NAME, hidden = true) +public final class IcebergThetaSketchForStats +{ + private IcebergThetaSketchForStats() {} + + public static final String NAME = "$iceberg_theta_stat"; + + @InputFunction + @TypeParameter("T") + public static void input(@TypeParameter("T") Type type, @AggregationState DataSketchState state, @BlockPosition @SqlType("T") Block block, @BlockIndex int index) + { + verify(!block.isNull(index), "Input function is not expected to be called on a NULL input"); + + Object trinoValue = readNativeValue(type, block, index); + org.apache.iceberg.types.Type icebergType = toIcebergType(type); + Object icebergValue = convertTrinoValueToIceberg(type, trinoValue); + ByteBuffer byteBuffer = Conversions.toByteBuffer(icebergType, icebergValue); + requireNonNull(byteBuffer, "byteBuffer is null"); // trino value isn't null + byte[] bytes = getBytes(byteBuffer); + getOrCreateUpdateSketch(state).update(bytes); + } + + @CombineFunction + public static void combine(@AggregationState DataSketchState state, @AggregationState DataSketchState otherState) + { + Union union = SetOperation.builder().buildUnion(); + addIfPresent(union, state.getUpdateSketch()); + addIfPresent(union, state.getCompactSketch()); + addIfPresent(union, otherState.getUpdateSketch()); + addIfPresent(union, otherState.getCompactSketch()); + + state.setUpdateSketch(null); + state.setCompactSketch(union.getResult()); + } + + @OutputFunction(StandardTypes.VARBINARY) + public static void output(@AggregationState DataSketchState state, BlockBuilder out) + { + if (state.getUpdateSketch() == null && state.getCompactSketch() == null) { + getOrCreateUpdateSketch(state); + } + DataSketchStateSerializer.serializeToVarbinary(state, out); + } + + private static UpdateSketch getOrCreateUpdateSketch(@AggregationState DataSketchState state) + { + UpdateSketch sketch = state.getUpdateSketch(); + if (sketch == null) { + // Must match Iceberg table statistics specification + // https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type + sketch = UpdateSketch.builder() + .setFamily(Family.ALPHA) + .build(); + state.setUpdateSketch(sketch); + } + return sketch; + } + + private static void addIfPresent(Union union, @Nullable Sketch input) + { + if (input != null) { + union.union(input); + } + } + + private static byte[] getBytes(ByteBuffer byteBuffer) + { + int length = byteBuffer.remaining(); + if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0) { + byte[] bytes = byteBuffer.array(); + if (bytes.length == length) { + return bytes; + } + } + byte[] bytes = new byte[length]; + byteBuffer.get(bytes); + return bytes; + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 7b5cca976aba..25f9f7cefe87 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -3998,9 +3998,9 @@ public void testAllAvailableTypes() " ('a_timestamp', NULL, 1e0, 0.5e0, NULL, " + (format == ORC ? "'2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'" : "'2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'") + "), " + " ('a_timestamptz', NULL, 1e0, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + " ('a_uuid', NULL, 1e0, 0.5e0, NULL, NULL, NULL), " + - " ('a_row', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + - " ('an_array', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + - " ('a_map', NULL, 1e0, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); } else { @@ -4021,9 +4021,9 @@ public void testAllAvailableTypes() " ('a_timestamp', NULL, 1e0, NULL, NULL, NULL, NULL), " + " ('a_timestamptz', NULL, 1e0, NULL, NULL, NULL, NULL), " + " ('a_uuid', NULL, 1e0, NULL, NULL, NULL, NULL), " + - " ('a_row', NULL, 1e0, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, 1e0, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, 1e0, NULL, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, NULL, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, NULL, NULL, NULL, NULL), " + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); } From 1c9098f2ee7a622a5ce84d99cab0a9503ca90fba Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 29 Sep 2022 17:33:17 +0200 Subject: [PATCH 11/11] empty