From 0bcc6796dea7123a5db25ef066fa2f3859837443 Mon Sep 17 00:00:00 2001 From: Konrad Dziedzic Date: Mon, 20 Mar 2023 00:03:54 +0100 Subject: [PATCH] Allow forcing Delta Lake connector to recalculate all statistics --- docs/src/main/sphinx/connector/delta-lake.rst | 7 + .../trino/plugin/deltalake/AnalyzeHandle.java | 11 +- .../deltalake/DeltaLakeAnalyzeProperties.java | 22 ++- .../plugin/deltalake/DeltaLakeMetadata.java | 18 +- .../deltalake/DeltaLakeSplitManager.java | 7 +- .../deltalake/TestDeltaLakeAnalyze.java | 158 ++++++++++++++++-- 6 files changed, 199 insertions(+), 24 deletions(-) diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 7774068db3b3..7342a8a4c220 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -918,6 +918,13 @@ To collect statistics for a table, execute the following statement:: ANALYZE table_schema.table_name; +To recalculate from scratch the statistics for the table use additional parameter ``mode``: + + ANALYZE table_schema.table_name WITH(mode = 'full_refresh'); + +There are two modes available ``full_refresh`` and ``incremental``. +The procedure use ``incremental`` by default. + To gain the most benefit from cost-based optimizations, run periodic ``ANALYZE`` statements on every large table that is frequently queried. diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AnalyzeHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AnalyzeHandle.java index 64dd5f248aff..fb4c9cdbcd50 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AnalyzeHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AnalyzeHandle.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; +import io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode; import java.time.Instant; import java.util.Optional; @@ -25,26 +26,26 @@ public class AnalyzeHandle { - private final boolean initialAnalyze; + private final AnalyzeMode analyzeMode; private final Optional filesModifiedAfter; private final Optional> columns; @JsonCreator public AnalyzeHandle( - @JsonProperty("initialAnalyze") boolean initialAnalyze, + @JsonProperty("analyzeMode") AnalyzeMode analyzeMode, @JsonProperty("startTime") Optional filesModifiedAfter, @JsonProperty("columns") Optional> columns) { - this.initialAnalyze = initialAnalyze; + this.analyzeMode = requireNonNull(analyzeMode, "analyzeMode is null"); this.filesModifiedAfter = requireNonNull(filesModifiedAfter, "filesModifiedAfter is null"); requireNonNull(columns, "columns is null"); this.columns = columns.map(ImmutableSet::copyOf); } @JsonProperty - public boolean isInitialAnalyze() + public AnalyzeMode getAnalyzeMode() { - return initialAnalyze; + return analyzeMode; } @JsonProperty diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeAnalyzeProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeAnalyzeProperties.java index 1e12c604ed2d..27d07a958c21 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeAnalyzeProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeAnalyzeProperties.java @@ -29,15 +29,24 @@ import java.util.Set; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.INCREMENTAL; import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; +import static io.trino.spi.session.PropertyMetadata.enumProperty; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; public class DeltaLakeAnalyzeProperties { + enum AnalyzeMode + { + INCREMENTAL, + FULL_REFRESH, + } + public static final String FILES_MODIFIED_AFTER = "files_modified_after"; public static final String COLUMNS_PROPERTY = "columns"; + public static final String MODE_PROPERTY = "mode"; private final List> analyzeProperties; @@ -62,7 +71,13 @@ public DeltaLakeAnalyzeProperties() null, false, DeltaLakeAnalyzeProperties::decodeColumnNames, - value -> value)); + value -> value), + enumProperty( + MODE_PROPERTY, + "Analyze mode", + AnalyzeMode.class, + INCREMENTAL, + false)); } public List> getAnalyzeProperties() @@ -75,6 +90,11 @@ public static Optional getFilesModifiedAfterProperty(Map properties) + { + return (AnalyzeMode) properties.get(MODE_PROPERTY); + } + public static Optional> getColumnNames(Map properties) { @SuppressWarnings("unchecked") diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index f70d31246e83..188b79c96eec 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -32,6 +32,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable; +import io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode; import io.trino.plugin.deltalake.expression.ParsingException; import io.trino.plugin.deltalake.expression.SparkExpressionParser; import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore; @@ -169,8 +170,11 @@ import static io.trino.filesystem.Locations.appendPath; import static io.trino.filesystem.Locations.getParent; import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA; +import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH; +import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.INCREMENTAL; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getColumnNames; import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getFilesModifiedAfterProperty; +import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getRefreshMode; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.fileModifiedTimeColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.fileSizeColumnHandle; @@ -2541,8 +2545,12 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession MetadataEntry metadata = handle.getMetadataEntry(); Optional filesModifiedAfterFromProperties = getFilesModifiedAfterProperty(analyzeProperties); + AnalyzeMode analyzeMode = getRefreshMode(analyzeProperties); - Optional statistics = statisticsAccess.readExtendedStatistics(session, handle.getLocation()); + Optional statistics = Optional.empty(); + if (analyzeMode == INCREMENTAL) { + statistics = statisticsAccess.readExtendedStatistics(session, handle.getLocation()); + } Optional alreadyAnalyzedModifiedTimeMax = statistics.map(ExtendedStatistics::getAlreadyAnalyzedModifiedTimeMax); @@ -2583,7 +2591,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession } } - AnalyzeHandle analyzeHandle = new AnalyzeHandle(statistics.isEmpty(), filesModifiedAfter, analyzeColumnNames); + AnalyzeHandle analyzeHandle = new AnalyzeHandle(statistics.isEmpty() ? FULL_REFRESH : INCREMENTAL, filesModifiedAfter, analyzeColumnNames); DeltaLakeTableHandle newHandle = new DeltaLakeTableHandle( handle.getSchemaTableName().getSchemaName(), handle.getSchemaTableName().getTableName(), @@ -2703,7 +2711,11 @@ private void updateTableStatistics( Collection computedStatistics, Optional> physicalColumnNameMapping) { - Optional oldStatistics = statisticsAccess.readExtendedStatistics(session, location); + Optional oldStatistics = Optional.empty(); + boolean loadExistingStats = analyzeHandle.isEmpty() || analyzeHandle.get().getAnalyzeMode() == INCREMENTAL; + if (loadExistingStats) { + oldStatistics = statisticsAccess.readExtendedStatistics(session, location); + } // more elaborate logic for handling statistics model evaluation may need to be introduced in the future // for now let's have a simple check rejecting update diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 41e400b4a865..3ea5c54b8ae5 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -57,6 +57,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout; @@ -177,11 +178,11 @@ private Stream getSplits( List predicatedColumns = schema.stream() .filter(column -> predicatedColumnNames.contains(column.getName())) // DeltaLakeColumnMetadata.name is lowercase .collect(toImmutableList()); - return validDataFiles.stream() .flatMap(addAction -> { - if (tableHandle.getAnalyzeHandle().isPresent() && !tableHandle.getAnalyzeHandle().get().isInitialAnalyze() && !addAction.isDataChange()) { - // skip files which do not introduce data change on non-initial ANALYZE + if (tableHandle.getAnalyzeHandle().isPresent() && + !(tableHandle.getAnalyzeHandle().get().getAnalyzeMode() == FULL_REFRESH) && !addAction.isDataChange()) { + // skip files which do not introduce data change on non FULL REFRESH return Stream.empty(); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java index ae9541faee47..6d2f2df517c5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAnalyze.java @@ -80,14 +80,21 @@ private void testAnalyze(Optional checkpointInterval) // check that analyze does not change already calculated statistics assertUpdate("ANALYZE " + tableName); + String expectedStats = "VALUES " + + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + + "(null, null, null, null, 25.0, null, null)"; assertQuery( "SHOW STATS FOR " + tableName, - "VALUES " + - "('nationkey', null, 25.0, 0.0, null, 0, 24)," + - "('regionkey', null, 5.0, 0.0, null, 0, 4)," + - "('comment', 1857.0, 25.0, 0.0, null, null, null)," + - "('name', 177.0, 25.0, 0.0, null, null, null)," + - "(null, null, null, null, 25.0, null, null)"); + expectedStats); + + // check that analyze with mode = incremental returns the same result as analyze without mode + assertUpdate("ANALYZE " + tableName + " WITH(mode = 'incremental')"); + assertQuery( + "SHOW STATS FOR " + tableName, + expectedStats); // insert one more copy assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.sf1.nation", 25); @@ -357,20 +364,32 @@ public void testAnalyzeSomeColumns() "('name', null, null, 0.0, null, null, null)," + "(null, null, null, null, 50.0, null, null)"); - // drop stats - assertUpdate(format("CALL %s.system.drop_extended_stats('%s', '%s')", DELTA_CATALOG, TPCH_SCHEMA, tableName)); - - // now we should be able to analyze all columns - assertUpdate(format("ANALYZE %s", tableName)); + // show that using full_refresh allows us to analyze any subset of columns + assertUpdate(format("ANALYZE %s WITH(mode = 'full_refresh', columns = ARRAY['nationkey', 'regionkey', 'name'])", tableName)); assertQuery( "SHOW STATS FOR " + tableName, "VALUES " + "('nationkey', null, 50.0, 0.0, null, 0, 49)," + "('regionkey', null, 10.0, 0.0, null, 0, 9)," + - "('comment', 3764.0, 50.0, 0.0, null, null, null)," + + "('comment', null, null, 0.0, null, null, null)," + "('name', 379.0, 50.0, 0.0, null, null, null)," + "(null, null, null, null, 50.0, null, null)"); + String expectedFullStats = "VALUES " + + "('nationkey', null, 50.0, 0.0, null, 0, 49)," + + "('regionkey', null, 10.0, 0.0, null, 0, 9)," + + "('comment', 3764.0, 50.0, 0.0, null, null, null)," + + "('name', 379.0, 50.0, 0.0, null, null, null)," + + "(null, null, null, null, 50.0, null, null)"; + assertUpdate(format("ANALYZE %s WITH(mode = 'full_refresh')", tableName)); + assertQuery("SHOW STATS FOR " + tableName, expectedFullStats); + + // drop stats + assertUpdate(format("CALL %s.system.drop_extended_stats('%s', '%s')", DELTA_CATALOG, TPCH_SCHEMA, tableName)); + // now we should be able to analyze all columns + assertUpdate(format("ANALYZE %s", tableName)); + assertQuery("SHOW STATS FOR " + tableName, expectedFullStats); + // we and we should be able to reanalyze with a subset of columns assertUpdate(format("ANALYZE %s WITH(columns = ARRAY['nationkey', 'regionkey'])", tableName)); assertQuery( @@ -765,6 +784,121 @@ public void testCollectStatsAfterColumnAdded(boolean collectOnWrite) assertUpdate("DROP TABLE " + tableName); } + @Test + public void testForceRecalculateStatsWithDeleteAndUpdate() + { + String tableName = "test_recalculate_all_stats_with_delete_and_update_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + + " AS SELECT * FROM tpch.sf1.nation", 25); + + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + + "(null, null, null, null, 25.0, null, null)"); + + // check that analyze does not change already calculated statistics + assertUpdate("ANALYZE " + tableName); + + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 25.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 1857.0, 25.0, 0.0, null, null, null)," + + "('name', 177.0, 25.0, 0.0, null, null, null)," + + "(null, null, null, null, 25.0, null, null)"); + + assertUpdate("DELETE FROM " + tableName + " WHERE nationkey = 1", 1); + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 24.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 1857.0, 24.0, 0.0, null, null, null)," + + "('name', 177.0, 24.0, 0.0, null, null, null)," + + "(null, null, null, null, 24.0, null, null)"); + assertUpdate("UPDATE " + tableName + " SET name = null WHERE nationkey = 2", 1); + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 24.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 1857.0, 24.0, 0.0, null, null, null)," + + "('name', 180.84782608695653, 23.5, 0.02083333333333337, null, null, null)," + + "(null, null, null, null, 24.0, null, null)"); + + assertUpdate(format("ANALYZE %s", tableName)); + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 24.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 3638.0, 24.0, 0.0, null, null, null)," + + "('name', 346.3695652173913, 23.5, 0.02083333333333337, null, null, null)," + + "(null, null, null, null, 24.0, null, null)"); + + assertUpdate(format("ANALYZE %s WITH(mode = 'full_refresh')", tableName)); + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 24.0, 0.0, null, 0, 24)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('comment', 1781.0, 24.0, 0.0, null, null, null)," + + "('name', 162.0, 23.0, 0.041666666666666664, null, null, null)," + + "(null, null, null, null, 24.0, null, null)"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testForceRecalculateAllStats() + { + String tableName = "test_recalculate_all_stats_" + randomNameSuffix(); + assertUpdate( + withStatsOnWrite(false), + "CREATE TABLE " + tableName + " AS SELECT nationkey, regionkey, name FROM tpch.sf1.nation", + 25); + + assertUpdate( + withStatsOnWrite(true), + "INSERT INTO " + tableName + " VALUES(27, 1, 'name1')", + 1); + + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 1.0, 0.0, null, 0, 27)," + + "('regionkey', null, 1.0, 0.0, null, 0, 4)," + + "('name', 5.0, 1.0, 0.0, null, null, null)," + + "(null, null, null, null, 26.0, null, null)"); + + // check that analyze does not change already calculated statistics + assertUpdate("ANALYZE " + tableName); + + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 1.0, 0.0, null, 0, 27)," + + "('regionkey', null, 1.0, 0.0, null, 0, 4)," + + "('name', 5.0, 1.0, 0.0, null, null, null)," + + "(null, null, null, null, 26.0, null, null)"); + + assertUpdate(format("ANALYZE %s WITH(mode = 'full_refresh')", tableName)); + assertQuery( + "SHOW STATS FOR " + tableName, + "VALUES " + + "('nationkey', null, 26.0, 0.0, null, 0, 27)," + + "('regionkey', null, 5.0, 0.0, null, 0, 4)," + + "('name', 182.0, 26.0, 0.0, null, null, null)," + + "(null, null, null, null, 26.0, null, null)"); + + assertUpdate("DROP TABLE " + tableName); + } + private Session withStatsOnWrite(boolean value) { Session session = getSession();