Skip to content

Commit

Permalink
[fix](stats) fix auto collector always create sample job no matter th…
Browse files Browse the repository at this point in the history
…e table size apache#26968 (apache#26972)
  • Loading branch information
Kikyou1997 authored and gnehil committed Dec 4, 2023
1 parent 9994b03 commit dd5f224
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.statistics.ColStatsMeta;
import org.apache.doris.statistics.ColumnStatistic;

import com.google.common.collect.ImmutableList;
Expand All @@ -54,6 +55,10 @@ public class ShowColumnStatsStmt extends ShowStmt {
.add("avg_size_byte")
.add("min")
.add("max")
.add("method")
.add("type")
.add("trigger")
.add("query_times")
.add("updated_time")
.build();

Expand Down Expand Up @@ -145,6 +150,12 @@ public ShowResultSet constructResultSet(List<Pair<String, ColumnStatistic>> colu
row.add(String.valueOf(p.second.avgSizeByte));
row.add(String.valueOf(p.second.minExpr == null ? "N/A" : p.second.minExpr.toSql()));
row.add(String.valueOf(p.second.maxExpr == null ? "N/A" : p.second.maxExpr.toSql()));
ColStatsMeta colStatsMeta = Env.getCurrentEnv().getAnalysisManager().findColStatsMeta(table.getId(),
p.first);
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.analysisMethod));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.analysisType));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.jobType));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.queriedTimes));
row.add(String.valueOf(p.second.updatedTime));
result.add(row);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,19 @@ public enum ScheduleType {
@SerializedName("usingSqlForPartitionColumn")
public final boolean usingSqlForPartitionColumn;

/**
* Used to store the newest partition version of tbl when creating this job.
*/
public final long tblUpdateTime;

public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId, long dbId, long tblId,
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
int samplePercent, long sampleRows, int maxBucketNum, long periodTimeInMs, String message,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn) {
boolean usingSqlForPartitionColumn, long tblUpdateTime) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -224,6 +229,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
}
this.forceFull = forceFull;
this.usingSqlForPartitionColumn = usingSqlForPartitionColumn;
this.tblUpdateTime = tblUpdateTime;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class AnalysisInfoBuilder {
private boolean forceFull;
private boolean usingSqlForPartitionColumn;

private long tblUpdateTime;

public AnalysisInfoBuilder() {
}

Expand Down Expand Up @@ -97,6 +99,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
cronExpression = info.cronExpression;
forceFull = info.forceFull;
usingSqlForPartitionColumn = info.usingSqlForPartitionColumn;
tblUpdateTime = info.tblUpdateTime;
}

public AnalysisInfoBuilder setJobId(long jobId) {
Expand Down Expand Up @@ -254,45 +257,17 @@ public AnalysisInfoBuilder setUsingSqlForPartitionColumn(boolean usingSqlForPart
return this;
}

public AnalysisInfoBuilder setTblUpdateTime(long tblUpdateTime) {
this.tblUpdateTime = tblUpdateTime;
return this;
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForPartitionColumn);
}

public AnalysisInfoBuilder copy() {
return new AnalysisInfoBuilder()
.setJobId(jobId)
.setTaskId(taskId)
.setTaskIds(taskIds)
.setCatalogId(catalogId)
.setDBId(dbId)
.setTblId(tblId)
.setColToPartitions(colToPartitions)
.setColName(colName)
.setIndexId(indexId)
.setJobType(jobType)
.setAnalysisMode(analysisMode)
.setAnalysisMethod(analysisMethod)
.setAnalysisType(analysisType)
.setSamplePercent(samplePercent)
.setSampleRows(sampleRows)
.setPeriodTimeInMs(periodTimeInMs)
.setMaxBucketNum(maxBucketNum)
.setMessage(message)
.setLastExecTimeInMs(lastExecTimeInMs)
.setTimeCostInMs(timeCostInMs)
.setState(state)
.setScheduleType(scheduleType)
.setExternalTableLevelTask(externalTableLevelTask)
.setSamplingPartition(samplingPartition)
.setPartitionOnly(partitionOnly)
.setAllPartition(isAllPartition)
.setPartitionCount(partitionCount)
.setCronExpression(cronExpression)
.setForceFull(forceFull)
.setUsingSqlForPartitionColumn(usingSqlForPartitionColumn);
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
Expand Down Expand Up @@ -505,7 +504,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
partitionNames, analysisType);
infoBuilder.setColToPartitions(colToPartitions);
infoBuilder.setTaskIds(Lists.newArrayList());

infoBuilder.setTblUpdateTime(table.getUpdateTime());
return infoBuilder.build();
}

Expand Down Expand Up @@ -601,9 +600,9 @@ public void updateTableStats(AnalysisInfo jobInfo) {
}
TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
if (tableStats == null) {
updateTableStatsStatus(new TableStatsMeta(tbl.getId(), tbl.estimatedRowCount(), jobInfo));
updateTableStatsStatus(new TableStatsMeta(tbl.estimatedRowCount(), jobInfo, tbl));
} else {
tableStats.updateByJob(jobInfo);
tableStats.update(jobInfo, tbl);
logCreateTableStats(tableStats);
}
}
Expand Down Expand Up @@ -1004,21 +1003,6 @@ public void registerSysJob(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> tas
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
}

@VisibleForTesting
protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
TableStatsMeta tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
}
return table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
.collect(Collectors.toSet());
}

protected void logAutoJob(AnalysisInfo autoJob) {
Env.getCurrentEnv().getEditLog().logAutoJob(autoJob);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void getTableStats() throws Exception {
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(
new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
new TableStatsMeta(Long.parseLong(rowCount), info, tbl));
job.rowCountDone(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private void getTableStats() throws Exception {
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE));
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
.updateTableStatsStatus(new TableStatsMeta(Long.parseLong(rowCount), info, table));
job.rowCountDone(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {

public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
TimeUnit.SECONDS.toMillis(Config.auto_check_statistics_in_minutes),
new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
}

Expand Down Expand Up @@ -142,12 +142,15 @@ protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(analysisMethod)
.setSampleRows(StatisticsUtil.getHugeTableSampleRows())
.setSampleRows(analysisMethod.equals(AnalysisMethod.SAMPLE)
? StatisticsUtil.getHugeTableSampleRows() : -1)
.setScheduleType(ScheduleType.AUTOMATIC)
.setState(AnalysisState.PENDING)
.setTaskIds(new ArrayList<>())
.setLastExecTimeInMs(System.currentTimeMillis())
.setJobType(JobType.SYSTEM).build();
.setJobType(JobType.SYSTEM)
.setTblUpdateTime(table.getUpdateTime())
.build();
analysisInfos.add(jobInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
Expand Down Expand Up @@ -64,11 +66,11 @@ public class TableStatsMeta implements Writable {

// It's necessary to store these fields separately from AnalysisInfo, since the lifecycle between AnalysisInfo
// and TableStats is quite different.
public TableStatsMeta(long tblId, long rowCount, AnalysisInfo analyzedJob) {
this.tblId = tblId;
public TableStatsMeta(long rowCount, AnalysisInfo analyzedJob, TableIf table) {
this.tblId = table.getId();
this.idxId = -1;
this.rowCount = rowCount;
updateByJob(analyzedJob);
update(analyzedJob, table);
}

@Override
Expand Down Expand Up @@ -112,8 +114,8 @@ public void reset() {
colNameToColStatsMeta.values().forEach(ColStatsMeta::clear);
}

public void updateByJob(AnalysisInfo analyzedJob) {
updatedTime = System.currentTimeMillis();
public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
updatedTime = analyzedJob.tblUpdateTime;
String colNameStr = analyzedJob.colName;
// colName field AnalyzeJob's format likes: "[col1, col2]", we need to remove brackets here
// TODO: Refactor this later
Expand All @@ -133,5 +135,10 @@ public void updateByJob(AnalysisInfo analyzedJob) {
}
}
jobType = analyzedJob.jobType;
if (tableIf != null && analyzedJob.colToPartitions.keySet()
.containsAll(tableIf.getColumns().stream().map(Column::getName).collect(
Collectors.toSet()))) {
updatedRows.set(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,23 +328,34 @@ public void testReAnalyze() {

int count = 0;
int[] rowCount = new int[]{100, 200};

final Column c = new Column("col1", PrimitiveType.INT);
@Mock
public long getRowCount() {
return rowCount[count++];
}

@Mock
public List<Column> getBaseSchema() {
return Lists.newArrayList(new Column("col1", PrimitiveType.INT));
return Lists.newArrayList(c);
}

@Mock
public List<Column> getColumns() {
return Lists.newArrayList(c);
}

};
OlapTable olapTable = new OlapTable();
TableStatsMeta stats1 = new TableStatsMeta(0, 50, new AnalysisInfoBuilder().setColName("col1").build());
TableStatsMeta stats1 = new TableStatsMeta(
50, new AnalysisInfoBuilder().setColToPartitions(new HashMap<>())
.setColName("col1").build(), olapTable);
stats1.updatedRows.addAndGet(50);

Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1));
TableStatsMeta stats2 = new TableStatsMeta(0, 190, new AnalysisInfoBuilder().setColName("col1").build());
TableStatsMeta stats2 = new TableStatsMeta(
190, new AnalysisInfoBuilder()
.setColToPartitions(new HashMap<>()).setColName("col1").build(), olapTable);
stats2.updatedRows.addAndGet(20);
Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2));

Expand Down
Loading

0 comments on commit dd5f224

Please sign in to comment.