Skip to content

Commit

Permalink
Allow forcing Delta Lake connector to recalculate all statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
homar committed Jun 13, 2023
1 parent 5295f3d commit 0bcc679
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 24 deletions.
7 changes: 7 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,13 @@ To collect statistics for a table, execute the following statement::

ANALYZE table_schema.table_name;

To recalculate from scratch the statistics for the table use additional parameter ``mode``:

ANALYZE table_schema.table_name WITH(mode = 'full_refresh');

There are two modes available ``full_refresh`` and ``incremental``.
The procedure use ``incremental`` by default.

To gain the most benefit from cost-based optimizations, run periodic ``ANALYZE``
statements on every large table that is frequently queried.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode;

import java.time.Instant;
import java.util.Optional;
Expand All @@ -25,26 +26,26 @@

public class AnalyzeHandle
{
private final boolean initialAnalyze;
private final AnalyzeMode analyzeMode;
private final Optional<Instant> filesModifiedAfter;
private final Optional<Set<String>> columns;

@JsonCreator
public AnalyzeHandle(
@JsonProperty("initialAnalyze") boolean initialAnalyze,
@JsonProperty("analyzeMode") AnalyzeMode analyzeMode,
@JsonProperty("startTime") Optional<Instant> filesModifiedAfter,
@JsonProperty("columns") Optional<Set<String>> columns)
{
this.initialAnalyze = initialAnalyze;
this.analyzeMode = requireNonNull(analyzeMode, "analyzeMode is null");
this.filesModifiedAfter = requireNonNull(filesModifiedAfter, "filesModifiedAfter is null");
requireNonNull(columns, "columns is null");
this.columns = columns.map(ImmutableSet::copyOf);
}

@JsonProperty
public boolean isInitialAnalyze()
public AnalyzeMode getAnalyzeMode()
{
return initialAnalyze;
return analyzeMode;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,24 @@
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.INCREMENTAL;
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;

public class DeltaLakeAnalyzeProperties
{
enum AnalyzeMode
{
INCREMENTAL,
FULL_REFRESH,
}

public static final String FILES_MODIFIED_AFTER = "files_modified_after";
public static final String COLUMNS_PROPERTY = "columns";
public static final String MODE_PROPERTY = "mode";

private final List<PropertyMetadata<?>> analyzeProperties;

Expand All @@ -62,7 +71,13 @@ public DeltaLakeAnalyzeProperties()
null,
false,
DeltaLakeAnalyzeProperties::decodeColumnNames,
value -> value));
value -> value),
enumProperty(
MODE_PROPERTY,
"Analyze mode",
AnalyzeMode.class,
INCREMENTAL,
false));
}

public List<PropertyMetadata<?>> getAnalyzeProperties()
Expand All @@ -75,6 +90,11 @@ public static Optional<Instant> getFilesModifiedAfterProperty(Map<String, Object
return Optional.ofNullable((Instant) properties.get(FILES_MODIFIED_AFTER));
}

public static AnalyzeMode getRefreshMode(Map<String, Object> properties)
{
return (AnalyzeMode) properties.get(MODE_PROPERTY);
}

public static Optional<Set<String>> getColumnNames(Map<String, Object> properties)
{
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode;
import io.trino.plugin.deltalake.expression.ParsingException;
import io.trino.plugin.deltalake.expression.SparkExpressionParser;
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore;
Expand Down Expand Up @@ -169,8 +170,11 @@
import static io.trino.filesystem.Locations.appendPath;
import static io.trino.filesystem.Locations.getParent;
import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.INCREMENTAL;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getColumnNames;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getFilesModifiedAfterProperty;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.getRefreshMode;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.fileModifiedTimeColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.fileSizeColumnHandle;
Expand Down Expand Up @@ -2541,8 +2545,12 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
MetadataEntry metadata = handle.getMetadataEntry();

Optional<Instant> filesModifiedAfterFromProperties = getFilesModifiedAfterProperty(analyzeProperties);
AnalyzeMode analyzeMode = getRefreshMode(analyzeProperties);

Optional<ExtendedStatistics> statistics = statisticsAccess.readExtendedStatistics(session, handle.getLocation());
Optional<ExtendedStatistics> statistics = Optional.empty();
if (analyzeMode == INCREMENTAL) {
statistics = statisticsAccess.readExtendedStatistics(session, handle.getLocation());
}

Optional<Instant> alreadyAnalyzedModifiedTimeMax = statistics.map(ExtendedStatistics::getAlreadyAnalyzedModifiedTimeMax);

Expand Down Expand Up @@ -2583,7 +2591,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
}
}

AnalyzeHandle analyzeHandle = new AnalyzeHandle(statistics.isEmpty(), filesModifiedAfter, analyzeColumnNames);
AnalyzeHandle analyzeHandle = new AnalyzeHandle(statistics.isEmpty() ? FULL_REFRESH : INCREMENTAL, filesModifiedAfter, analyzeColumnNames);
DeltaLakeTableHandle newHandle = new DeltaLakeTableHandle(
handle.getSchemaTableName().getSchemaName(),
handle.getSchemaTableName().getTableName(),
Expand Down Expand Up @@ -2703,7 +2711,11 @@ private void updateTableStatistics(
Collection<ComputedStatistics> computedStatistics,
Optional<Map<String, String>> physicalColumnNameMapping)
{
Optional<ExtendedStatistics> oldStatistics = statisticsAccess.readExtendedStatistics(session, location);
Optional<ExtendedStatistics> oldStatistics = Optional.empty();
boolean loadExistingStats = analyzeHandle.isEmpty() || analyzeHandle.get().getAnalyzeMode() == INCREMENTAL;
if (loadExistingStats) {
oldStatistics = statisticsAccess.readExtendedStatistics(session, location);
}

// more elaborate logic for handling statistics model evaluation may need to be introduced in the future
// for now let's have a simple check rejecting update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout;
Expand Down Expand Up @@ -177,11 +178,11 @@ private Stream<DeltaLakeSplit> getSplits(
List<DeltaLakeColumnMetadata> predicatedColumns = schema.stream()
.filter(column -> predicatedColumnNames.contains(column.getName())) // DeltaLakeColumnMetadata.name is lowercase
.collect(toImmutableList());

return validDataFiles.stream()
.flatMap(addAction -> {
if (tableHandle.getAnalyzeHandle().isPresent() && !tableHandle.getAnalyzeHandle().get().isInitialAnalyze() && !addAction.isDataChange()) {
// skip files which do not introduce data change on non-initial ANALYZE
if (tableHandle.getAnalyzeHandle().isPresent() &&
!(tableHandle.getAnalyzeHandle().get().getAnalyzeMode() == FULL_REFRESH) && !addAction.isDataChange()) {
// skip files which do not introduce data change on non FULL REFRESH
return Stream.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,21 @@ private void testAnalyze(Optional<Integer> checkpointInterval)
// check that analyze does not change already calculated statistics
assertUpdate("ANALYZE " + tableName);

String expectedStats = "VALUES " +
"('nationkey', null, 25.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', 1857.0, 25.0, 0.0, null, null, null)," +
"('name', 177.0, 25.0, 0.0, null, null, null)," +
"(null, null, null, null, 25.0, null, null)";
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 25.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', 1857.0, 25.0, 0.0, null, null, null)," +
"('name', 177.0, 25.0, 0.0, null, null, null)," +
"(null, null, null, null, 25.0, null, null)");
expectedStats);

// check that analyze with mode = incremental returns the same result as analyze without mode
assertUpdate("ANALYZE " + tableName + " WITH(mode = 'incremental')");
assertQuery(
"SHOW STATS FOR " + tableName,
expectedStats);

// insert one more copy
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.sf1.nation", 25);
Expand Down Expand Up @@ -357,20 +364,32 @@ public void testAnalyzeSomeColumns()
"('name', null, null, 0.0, null, null, null)," +
"(null, null, null, null, 50.0, null, null)");

// drop stats
assertUpdate(format("CALL %s.system.drop_extended_stats('%s', '%s')", DELTA_CATALOG, TPCH_SCHEMA, tableName));

// now we should be able to analyze all columns
assertUpdate(format("ANALYZE %s", tableName));
// show that using full_refresh allows us to analyze any subset of columns
assertUpdate(format("ANALYZE %s WITH(mode = 'full_refresh', columns = ARRAY['nationkey', 'regionkey', 'name'])", tableName));
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 50.0, 0.0, null, 0, 49)," +
"('regionkey', null, 10.0, 0.0, null, 0, 9)," +
"('comment', 3764.0, 50.0, 0.0, null, null, null)," +
"('comment', null, null, 0.0, null, null, null)," +
"('name', 379.0, 50.0, 0.0, null, null, null)," +
"(null, null, null, null, 50.0, null, null)");

String expectedFullStats = "VALUES " +
"('nationkey', null, 50.0, 0.0, null, 0, 49)," +
"('regionkey', null, 10.0, 0.0, null, 0, 9)," +
"('comment', 3764.0, 50.0, 0.0, null, null, null)," +
"('name', 379.0, 50.0, 0.0, null, null, null)," +
"(null, null, null, null, 50.0, null, null)";
assertUpdate(format("ANALYZE %s WITH(mode = 'full_refresh')", tableName));
assertQuery("SHOW STATS FOR " + tableName, expectedFullStats);

// drop stats
assertUpdate(format("CALL %s.system.drop_extended_stats('%s', '%s')", DELTA_CATALOG, TPCH_SCHEMA, tableName));
// now we should be able to analyze all columns
assertUpdate(format("ANALYZE %s", tableName));
assertQuery("SHOW STATS FOR " + tableName, expectedFullStats);

// we and we should be able to reanalyze with a subset of columns
assertUpdate(format("ANALYZE %s WITH(columns = ARRAY['nationkey', 'regionkey'])", tableName));
assertQuery(
Expand Down Expand Up @@ -765,6 +784,121 @@ public void testCollectStatsAfterColumnAdded(boolean collectOnWrite)
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testForceRecalculateStatsWithDeleteAndUpdate()
{
String tableName = "test_recalculate_all_stats_with_delete_and_update_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName
+ " AS SELECT * FROM tpch.sf1.nation", 25);

assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 25.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', 1857.0, 25.0, 0.0, null, null, null)," +
"('name', 177.0, 25.0, 0.0, null, null, null)," +
"(null, null, null, null, 25.0, null, null)");

// check that analyze does not change already calculated statistics
assertUpdate("ANALYZE " + tableName);

assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 25.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', 1857.0, 25.0, 0.0, null, null, null)," +
"('name', 177.0, 25.0, 0.0, null, null, null)," +
"(null, null, null, null, 25.0, null, null)");

assertUpdate("DELETE FROM " + tableName + " WHERE nationkey = 1", 1);
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 24.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', 1857.0, 24.0, 0.0, null, null, null)," +
"('name', 177.0, 24.0, 0.0, null, null, null)," +
"(null, null, null, null, 24.0, null, null)");
assertUpdate("UPDATE " + tableName + " SET name = null WHERE nationkey = 2", 1);
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 24.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', 1857.0, 24.0, 0.0, null, null, null)," +
"('name', 180.84782608695653, 23.5, 0.02083333333333337, null, null, null)," +
"(null, null, null, null, 24.0, null, null)");

assertUpdate(format("ANALYZE %s", tableName));
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 24.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', 3638.0, 24.0, 0.0, null, null, null)," +
"('name', 346.3695652173913, 23.5, 0.02083333333333337, null, null, null)," +
"(null, null, null, null, 24.0, null, null)");

assertUpdate(format("ANALYZE %s WITH(mode = 'full_refresh')", tableName));
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 24.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', 1781.0, 24.0, 0.0, null, null, null)," +
"('name', 162.0, 23.0, 0.041666666666666664, null, null, null)," +
"(null, null, null, null, 24.0, null, null)");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testForceRecalculateAllStats()
{
String tableName = "test_recalculate_all_stats_" + randomNameSuffix();
assertUpdate(
withStatsOnWrite(false),
"CREATE TABLE " + tableName + " AS SELECT nationkey, regionkey, name FROM tpch.sf1.nation",
25);

assertUpdate(
withStatsOnWrite(true),
"INSERT INTO " + tableName + " VALUES(27, 1, 'name1')",
1);

assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 1.0, 0.0, null, 0, 27)," +
"('regionkey', null, 1.0, 0.0, null, 0, 4)," +
"('name', 5.0, 1.0, 0.0, null, null, null)," +
"(null, null, null, null, 26.0, null, null)");

// check that analyze does not change already calculated statistics
assertUpdate("ANALYZE " + tableName);

assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 1.0, 0.0, null, 0, 27)," +
"('regionkey', null, 1.0, 0.0, null, 0, 4)," +
"('name', 5.0, 1.0, 0.0, null, null, null)," +
"(null, null, null, null, 26.0, null, null)");

assertUpdate(format("ANALYZE %s WITH(mode = 'full_refresh')", tableName));
assertQuery(
"SHOW STATS FOR " + tableName,
"VALUES " +
"('nationkey', null, 26.0, 0.0, null, 0, 27)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('name', 182.0, 26.0, 0.0, null, null, null)," +
"(null, null, null, null, 26.0, null, null)");

assertUpdate("DROP TABLE " + tableName);
}

private Session withStatsOnWrite(boolean value)
{
Session session = getSession();
Expand Down

0 comments on commit 0bcc679

Please sign in to comment.