Skip to content

Commit

Permalink
Fix auto analyze doesn't filter unsupported type bug. (apache#27547)
Browse files Browse the repository at this point in the history
Fix auto analyze doesn't filter unsupported type bug.
Catch throwable in auto analyze thread for each database, otherwise the thread will quit when one database failed to create jobs and all other databases will not get analyzed.
change FE config item full_auto_analyze_simultaneously_running_task_num to auto_analyze_simultaneously_running_task_num
backport apache#27559
  • Loading branch information
Jibing-Li authored and eldenmoon committed Nov 27, 2023
1 parent 4d1aaaa commit 7781a44
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2156,7 +2156,7 @@ public class Config extends ConfigBase {
public static int autobucket_min_buckets = 1;

@ConfField
public static int full_auto_analyze_simultaneously_running_task_num = 1;
public static int auto_analyze_simultaneously_running_task_num = 1;

@ConfField
public static final int period_analyze_simultaneously_running_task_num = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,7 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
}
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName)
.collect(Collectors.toSet()))) {
return true;
Expand Down Expand Up @@ -2316,10 +2317,14 @@ public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
Set<String> allPartitions = table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
if (tableStats == null) {
return table.getBaseSchema().stream().collect(Collectors.toMap(Column::getName, v -> allPartitions));
return table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.collect(Collectors.toMap(Column::getName, v -> allPartitions));
}
Map<String, Set<String>> colToPart = new HashMap<>();
for (Column col : table.getBaseSchema()) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName());
Set<String> partitions = table.getPartitionNames().stream()
.map(table::getPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.Sets;
Expand Down Expand Up @@ -397,7 +398,8 @@ public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
HashSet<String> partitions = Sets.newHashSet();
// TODO: Find a way to collect external table partitions that need to be analyzed.
partitions.add("Dummy Partition");
return getBaseSchema().stream().collect(Collectors.toMap(Column::getName, k -> partitions));
return getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.collect(Collectors.toMap(Column::getName, k -> partitions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num));
}

@Override
Expand All @@ -77,12 +78,17 @@ private void analyzeAll() {
if (StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
continue;
}
analyzeDb(databaseIf);
try {
analyzeDb(databaseIf);
} catch (Throwable t) {
LOG.warn("Failed to analyze database {}.{}", ctl.getName(), databaseIf.getFullName(), t);
continue;
}
}
}
}

public void analyzeDb(DatabaseIf<TableIf> databaseIf) {
public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisInfo = getReAnalyzeRequiredPart(analysisInfo);
Expand All @@ -92,7 +98,8 @@ public void analyzeDb(DatabaseIf<TableIf> databaseIf) {
try {
createSystemAnalysisJob(analysisInfo);
} catch (Exception e) {
LOG.warn("Failed to create analysis job", e);
analysisInfo.message = e.getMessage();
throw e;
}
}
}
Expand Down Expand Up @@ -136,8 +143,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
.setTblId(table.getId())
.setColName(
table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(
Column::getName).collect(Collectors.joining(","))
.map(Column::getName).collect(Collectors.joining(","))
)
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.gson.annotations.SerializedName;

Expand Down Expand Up @@ -136,8 +137,9 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
}
jobType = analyzedJob.jobType;
if (tableIf != null && analyzedJob.colToPartitions.keySet()
.containsAll(tableIf.getBaseSchema().stream().map(Column::getName).collect(
Collectors.toSet()))) {
.containsAll(tableIf.getBaseSchema().stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName).collect(Collectors.toSet()))) {
updatedRows.set(0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public boolean enableAutoAnalyze() {
@Test
public void checkAvailableThread() {
StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
Assertions.assertEquals(Config.full_auto_analyze_simultaneously_running_task_num,
Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num,
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
}

Expand Down

0 comments on commit 7781a44

Please sign in to comment.