Skip to content

Commit

Permalink
Fix auto analyze doesn't filter unsupported type bug. (#27547)
Browse files Browse the repository at this point in the history
Fix auto analyze doesn't filter unsupported type bug.
Catch throwable in auto analyze thread for each database, otherwise the thread will quit when one database failed to create jobs and all other databases will not get analyzed.
change FE config item full_auto_analyze_simultaneously_running_task_num to auto_analyze_simultaneously_running_task_num
backport #27559
  • Loading branch information
Jibing-Li authored Nov 24, 2023
1 parent 114052b commit bb48353
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2156,7 +2156,7 @@ public class Config extends ConfigBase {
public static int autobucket_min_buckets = 1;

@ConfField
public static int full_auto_analyze_simultaneously_running_task_num = 1;
public static int auto_analyze_simultaneously_running_task_num = 1;

@ConfField
public static final int period_analyze_simultaneously_running_task_num = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,7 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
}
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName)
.collect(Collectors.toSet()))) {
return true;
Expand Down Expand Up @@ -2297,10 +2298,14 @@ public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
Set<String> allPartitions = table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
if (tableStats == null) {
return table.getBaseSchema().stream().collect(Collectors.toMap(Column::getName, v -> allPartitions));
return table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.collect(Collectors.toMap(Column::getName, v -> allPartitions));
}
Map<String, Set<String>> colToPart = new HashMap<>();
for (Column col : table.getBaseSchema()) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName());
Set<String> partitions = table.getPartitionNames().stream()
.map(table::getPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.Sets;
Expand Down Expand Up @@ -397,7 +398,8 @@ public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
HashSet<String> partitions = Sets.newHashSet();
// TODO: Find a way to collect external table partitions that need to be analyzed.
partitions.add("Dummy Partition");
return getBaseSchema().stream().collect(Collectors.toMap(Column::getName, k -> partitions));
return getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.collect(Collectors.toMap(Column::getName, k -> partitions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num));
}

@Override
Expand All @@ -77,12 +78,17 @@ private void analyzeAll() {
if (StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
continue;
}
analyzeDb(databaseIf);
try {
analyzeDb(databaseIf);
} catch (Throwable t) {
LOG.warn("Failed to analyze database {}.{}", ctl.getName(), databaseIf.getFullName(), t);
continue;
}
}
}
}

public void analyzeDb(DatabaseIf<TableIf> databaseIf) {
public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisInfo = getReAnalyzeRequiredPart(analysisInfo);
Expand All @@ -92,7 +98,8 @@ public void analyzeDb(DatabaseIf<TableIf> databaseIf) {
try {
createSystemAnalysisJob(analysisInfo);
} catch (Exception e) {
LOG.warn("Failed to create analysis job", e);
analysisInfo.message = e.getMessage();
throw e;
}
}
}
Expand Down Expand Up @@ -136,8 +143,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
.setTblId(table.getId())
.setColName(
table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(
Column::getName).collect(Collectors.joining(","))
.map(Column::getName).collect(Collectors.joining(","))
)
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.gson.annotations.SerializedName;

Expand Down Expand Up @@ -136,8 +137,9 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
}
jobType = analyzedJob.jobType;
if (tableIf != null && analyzedJob.colToPartitions.keySet()
.containsAll(tableIf.getBaseSchema().stream().map(Column::getName).collect(
Collectors.toSet()))) {
.containsAll(tableIf.getBaseSchema().stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName).collect(Collectors.toSet()))) {
updatedRows.set(0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public boolean enableAutoAnalyze() {
@Test
public void checkAvailableThread() {
StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
Assertions.assertEquals(Config.full_auto_analyze_simultaneously_running_task_num,
Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num,
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
}

Expand Down

0 comments on commit bb48353

Please sign in to comment.