From 6b1428dba1ed4ab61052a9b7c3966d844718ded2 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Sat, 25 Nov 2023 10:22:52 +0800 Subject: [PATCH] Fix auto analyze doesn't filter unsupported type bug. (#27559) Fix auto analyze doesn't filter unsupported type bug. Catch throwable in auto analyze thread for each database, otherwise the thread will quit when one database failed to create jobs and all other databases will not get analyzed. change FE config item full_auto_analyze_simultaneously_running_task_num to auto_analyze_simultaneously_running_task_num --- .../java/org/apache/doris/common/Config.java | 2 +- .../org/apache/doris/catalog/OlapTable.java | 10 ++++++--- .../doris/catalog/external/ExternalTable.java | 4 +++- .../statistics/StatisticsAutoCollector.java | 22 ++++++++++++------- .../doris/statistics/TableStatsMeta.java | 6 +++-- .../StatisticsAutoCollectorTest.java | 2 +- 6 files changed, 30 insertions(+), 16 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 0b71947dea4a75..e03dfd2a866016 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2112,7 +2112,7 @@ public class Config extends ConfigBase { public static int force_olap_table_replication_num = 0; @ConfField - public static int full_auto_analyze_simultaneously_running_task_num = 1; + public static int auto_analyze_simultaneously_running_task_num = 1; @ConfField public static final int period_analyze_simultaneously_running_task_num = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index aacd2080a39199..ee23284e5d7a8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1162,6 +1162,7 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) { } if (!tblStats.analyzeColumns().containsAll(getBaseSchema() .stream() + .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) .map(Column::getName) .collect(Collectors.toSet()))) { return true; @@ -1178,16 +1179,20 @@ public Map> findReAnalyzeNeededPartitions() { Set allPartitions = table.getPartitionNames().stream().map(table::getPartition) .filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet()); if (tableStats == null) { - return table.getBaseSchema().stream().collect(Collectors.toMap(Column::getName, v -> allPartitions)); + return table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .collect(Collectors.toMap(Column::getName, v -> allPartitions)); } Map> colToPart = new HashMap<>(); for (Column col : table.getBaseSchema()) { + if (StatisticsUtil.isUnsupportedType(col.getType())) { + continue; + } long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName()); Set partitions = table.getPartitionNames().stream() .map(table::getPartition) .filter(Partition::hasData) .filter(partition -> - partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName) + partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName) .collect(Collectors.toSet()); colToPart.put(col.getName(), partitions); } @@ -2393,7 +2398,6 @@ public void analyze(String dbName) { } } - @Override public boolean isDistributionColumn(String columnName) { Set distributeColumns = getDistributionColumnNames() .stream().map(String::toLowerCase).collect(Collectors.toSet()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index a915136193c891..4eab7ebf813620 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -36,6 +36,7 @@ import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.TableStatsMeta; +import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.TTableDescriptor; import com.google.common.collect.Sets; @@ -397,7 +398,8 @@ public Map> findReAnalyzeNeededPartitions() { HashSet partitions = Sets.newHashSet(); // TODO: Find a way to collect external table partitions that need to be analyzed. partitions.add("Dummy Partition"); - return getBaseSchema().stream().collect(Collectors.toMap(Column::getName, k -> partitions)); + return getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .collect(Collectors.toMap(Column::getName, k -> partitions)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 80afcb2c0f18cb..b317e72c9e345e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; @@ -50,8 +51,8 @@ public class StatisticsAutoCollector extends StatisticsCollector { public StatisticsAutoCollector() { super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.full_auto_analyze_simultaneously_running_task_num), - new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num)); + TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes), + new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num)); } @Override @@ -77,12 +78,17 @@ private void analyzeAll() { if (StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) { continue; } - analyzeDb(databaseIf); + try { + analyzeDb(databaseIf); + } catch (Throwable t) { + LOG.warn("Failed to analyze database {}.{}", ctl.getName(), databaseIf.getFullName(), t); + continue; + } } } } - public void analyzeDb(DatabaseIf databaseIf) { + public void analyzeDb(DatabaseIf databaseIf) throws DdlException { List analysisInfos = constructAnalysisInfo(databaseIf); for (AnalysisInfo analysisInfo : analysisInfos) { analysisInfo = getReAnalyzeRequiredPart(analysisInfo); @@ -91,8 +97,9 @@ public void analyzeDb(DatabaseIf databaseIf) { } try { createSystemAnalysisJob(analysisInfo); - } catch (Exception e) { - LOG.warn("Failed to create analysis job", e); + } catch (Throwable t) { + analysisInfo.message = t.getMessage(); + throw t; } } } @@ -136,8 +143,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf db, .setTblId(table.getId()) .setColName( table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) - .map( - Column::getName).collect(Collectors.joining(",")) + .map(Column::getName).collect(Collectors.joining(",")) ) .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS) .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 6f50802f395653..97a2cd15186f24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -23,6 +23,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.util.StatisticsUtil; import com.google.gson.annotations.SerializedName; @@ -136,8 +137,9 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) { } jobType = analyzedJob.jobType; if (tableIf != null && analyzedJob.colToPartitions.keySet() - .containsAll(tableIf.getBaseSchema().stream().map(Column::getName).collect( - Collectors.toSet()))) { + .containsAll(tableIf.getBaseSchema().stream() + .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .map(Column::getName).collect(Collectors.toSet()))) { updatedRows.set(0); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java index 00a68e204ab161..d94bdd61248734 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java @@ -247,7 +247,7 @@ public boolean enableAutoAnalyze() { @Test public void checkAvailableThread() { StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - Assertions.assertEquals(Config.full_auto_analyze_simultaneously_running_task_num, + Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num, autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize()); }