Skip to content

Commit

Permalink
Fix auto analyze doesn't filter unsupported type bug. (#27559)
Browse files Browse the repository at this point in the history
Fix auto analyze doesn't filter unsupported type bug.
Catch throwable in auto analyze thread for each database, otherwise the thread will quit when one database failed to create jobs and all other databases will not get analyzed.
change FE config item full_auto_analyze_simultaneously_running_task_num to auto_analyze_simultaneously_running_task_num
  • Loading branch information
Jibing-Li authored Nov 25, 2023
1 parent 6142a53 commit 6b1428d
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2112,7 +2112,7 @@ public class Config extends ConfigBase {
public static int force_olap_table_replication_num = 0;

@ConfField
public static int full_auto_analyze_simultaneously_running_task_num = 1;
public static int auto_analyze_simultaneously_running_task_num = 1;

@ConfField
public static final int period_analyze_simultaneously_running_task_num = 1;
Expand Down
10 changes: 7 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
}
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName)
.collect(Collectors.toSet()))) {
return true;
Expand All @@ -1178,16 +1179,20 @@ public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
Set<String> allPartitions = table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
if (tableStats == null) {
return table.getBaseSchema().stream().collect(Collectors.toMap(Column::getName, v -> allPartitions));
return table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.collect(Collectors.toMap(Column::getName, v -> allPartitions));
}
Map<String, Set<String>> colToPart = new HashMap<>();
for (Column col : table.getBaseSchema()) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName());
Set<String> partitions = table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName)
partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName)
.collect(Collectors.toSet());
colToPart.put(col.getName(), partitions);
}
Expand Down Expand Up @@ -2393,7 +2398,6 @@ public void analyze(String dbName) {
}
}

@Override
public boolean isDistributionColumn(String columnName) {
Set<String> distributeColumns = getDistributionColumnNames()
.stream().map(String::toLowerCase).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.Sets;
Expand Down Expand Up @@ -397,7 +398,8 @@ public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
HashSet<String> partitions = Sets.newHashSet();
// TODO: Find a way to collect external table partitions that need to be analyzed.
partitions.add("Dummy Partition");
return getBaseSchema().stream().collect(Collectors.toMap(Column::getName, k -> partitions));
return getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.collect(Collectors.toMap(Column::getName, k -> partitions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
Expand All @@ -50,8 +51,8 @@ public class StatisticsAutoCollector extends StatisticsCollector {

public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.full_auto_analyze_simultaneously_running_task_num),
new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num));
}

@Override
Expand All @@ -77,12 +78,17 @@ private void analyzeAll() {
if (StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
continue;
}
analyzeDb(databaseIf);
try {
analyzeDb(databaseIf);
} catch (Throwable t) {
LOG.warn("Failed to analyze database {}.{}", ctl.getName(), databaseIf.getFullName(), t);
continue;
}
}
}
}

public void analyzeDb(DatabaseIf<TableIf> databaseIf) {
public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisInfo = getReAnalyzeRequiredPart(analysisInfo);
Expand All @@ -91,8 +97,9 @@ public void analyzeDb(DatabaseIf<TableIf> databaseIf) {
}
try {
createSystemAnalysisJob(analysisInfo);
} catch (Exception e) {
LOG.warn("Failed to create analysis job", e);
} catch (Throwable t) {
analysisInfo.message = t.getMessage();
throw t;
}
}
}
Expand Down Expand Up @@ -136,8 +143,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
.setTblId(table.getId())
.setColName(
table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(
Column::getName).collect(Collectors.joining(","))
.map(Column::getName).collect(Collectors.joining(","))
)
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.gson.annotations.SerializedName;

Expand Down Expand Up @@ -136,8 +137,9 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
}
jobType = analyzedJob.jobType;
if (tableIf != null && analyzedJob.colToPartitions.keySet()
.containsAll(tableIf.getBaseSchema().stream().map(Column::getName).collect(
Collectors.toSet()))) {
.containsAll(tableIf.getBaseSchema().stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName).collect(Collectors.toSet()))) {
updatedRows.set(0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public boolean enableAutoAnalyze() {
@Test
public void checkAvailableThread() {
StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
Assertions.assertEquals(Config.full_auto_analyze_simultaneously_running_task_num,
Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num,
autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize());
}

Expand Down

0 comments on commit 6b1428d

Please sign in to comment.