Skip to content

Commit

Permalink
[Enhancement] run auto analyze job with priority (StarRocks#55446)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork authored Jan 26, 2025
1 parent b4b3ce8 commit 4f8f233
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ public void run(ConnectContext statsConnectContext, StatisticExecutor statisticE

boolean hasFailedCollectJob = false;
for (StatisticsCollectJob statsJob : jobs) {
if (!StatisticAutoCollector.checkoutAnalyzeTime()) {
break;
}
AnalyzeStatus analyzeStatus = new ExternalAnalyzeStatus(GlobalStateMgr.getCurrentState().getNextId(),
statsJob.getCatalogName(), statsJob.getDb().getFullName(), statsJob.getTable().getName(),
statsJob.getTable().getUUID(), statsJob.getColumnNames(), statsJob.getType(), statsJob.getScheduleType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ public void run(ConnectContext statsConnectContext, StatisticExecutor statisticE

boolean hasFailedCollectJob = false;
for (StatisticsCollectJob statsJob : jobs) {
if (!StatisticAutoCollector.checkoutAnalyzeTime()) {
break;
}
AnalyzeStatus analyzeStatus = new NativeAnalyzeStatus(GlobalStateMgr.getCurrentState().getNextId(),
statsJob.getDb().getId(), statsJob.getTable().getId(), statsJob.getColumnNames(),
statsJob.getType(), statsJob.getScheduleType(), statsJob.getProperties(), LocalDateTime.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected void runAfterCatalogReady() {
return;
}

if (!checkoutAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
if (!checkoutAnalyzeTime()) {
return;
}

Expand Down Expand Up @@ -106,6 +106,9 @@ public List<StatisticsCollectJob> runJobs() {
List<StatisticsCollectJob> allJobs =
StatisticsCollectJobFactory.buildStatisticsCollectJob(createDefaultJobAnalyzeAll());
for (StatisticsCollectJob statsJob : allJobs) {
if (!checkoutAnalyzeTime()) {
break;
}
// user-created analyze job has a higher priority
if (statsJob.isAnalyzeTable() && analyzeTableSet.contains(statsJob.getTable().getId())) {
continue;
Expand Down Expand Up @@ -170,7 +173,17 @@ private NativeAnalyzeJob createDefaultJobAnalyzeAll() {
Maps.newHashMap(), ScheduleStatus.PENDING, LocalDateTime.MIN);
}

private boolean checkoutAnalyzeTime(LocalTime now) {
/**
* Check if it's a proper time to run auto analyze
*
* @return true if it's a good time
*/
public static boolean checkoutAnalyzeTime() {
LocalTime now = LocalTime.now(TimeUtils.getTimeZone().toZoneId());
return checkoutAnalyzeTime(now);
}

private static boolean checkoutAnalyzeTime(LocalTime now) {
try {
LocalTime start = LocalTime.parse(Config.statistic_auto_analyze_start_time, DateUtils.TIME_FORMATTER);
LocalTime end = LocalTime.parse(Config.statistic_auto_analyze_end_time, DateUtils.TIME_FORMATTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@
import org.apache.logging.log4j.Logger;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.VelocityEngine;
import org.jetbrains.annotations.NotNull;

import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -60,6 +64,7 @@ public abstract class StatisticsCollectJob {
protected final StatsConstants.AnalyzeType type;
protected final StatsConstants.ScheduleType scheduleType;
protected final Map<String, String> properties;
protected Priority priority;

protected StatisticsCollectJob(Database db, Table table, List<String> columnNames,
StatsConstants.AnalyzeType type, StatsConstants.ScheduleType scheduleType,
Expand Down Expand Up @@ -131,6 +136,14 @@ public boolean isAnalyzeTable() {
return CollectionUtils.isEmpty(columnNames);
}

public void setPriority(Priority priority) {
this.priority = priority;
}

public Priority getPriority() {
return this.priority;
}

protected void setDefaultSessionVariable(ConnectContext context) {
SessionVariable sessionVariable = context.getSessionVariable();
// Statistics collecting is not user-specific, which means response latency is not that important.
Expand Down Expand Up @@ -242,4 +255,49 @@ public String toString() {
sb.append('}');
return sb.toString();
}

public static class Priority implements Comparable<Priority> {
public LocalDateTime tableUpdateTime;
public LocalDateTime statsUpdateTime;
public double healthy;

public Priority(LocalDateTime tableUpdateTime, LocalDateTime statsUpdateTime, double healthy) {
this.tableUpdateTime = tableUpdateTime;
this.statsUpdateTime = statsUpdateTime;
this.healthy = healthy;
}

public long statsStaleness() {
if (statsUpdateTime != LocalDateTime.MIN) {
Duration gap = Duration.between(statsUpdateTime, tableUpdateTime);
// If the tableUpdate < statsUpdate, the duration can be a negative value, so normalize it to 0
return Math.max(0, gap.getSeconds());
} else {
Duration gap = Duration.between(tableUpdateTime, LocalDateTime.now());
return Math.max(0, gap.getSeconds()) + 3600;
}
}

@Override
public int compareTo(@NotNull Priority o) {
// Lower health means higher priority
if (healthy != o.healthy) {
return Double.compare(healthy, o.healthy);
}
// Higher staleness means higher priority
return Long.compare(o.statsStaleness(), statsStaleness());
}
}

public static class ComparatorWithPriority
implements Comparator<StatisticsCollectJob> {

@Override
public int compare(StatisticsCollectJob o1, StatisticsCollectJob o2) {
if (o1.getPriority() != null && o2.getPriority() != null) {
return o1.getPriority().compareTo(o2.getPriority());
}
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public class StatisticsCollectJobFactory {
private StatisticsCollectJobFactory() {
}

/**
* Build several statistics jobs for ANALYZE with priority
*
* @return jobs order by priority
*/
public static List<StatisticsCollectJob> buildStatisticsCollectJob(NativeAnalyzeJob nativeAnalyzeJob) {
List<StatisticsCollectJob> statsJobs = Lists.newArrayList();
if (StatsConstants.DEFAULT_ALL_ID == nativeAnalyzeJob.getDbId()) {
Expand Down Expand Up @@ -87,6 +92,8 @@ public static List<StatisticsCollectJob> buildStatisticsCollectJob(NativeAnalyze
nativeAnalyzeJob.getColumns(), nativeAnalyzeJob.getColumnTypes());
}

// Put higher priority jobs at the front
statsJobs.sort(new StatisticsCollectJob.ComparatorWithPriority());
return statsJobs;
}

Expand Down Expand Up @@ -197,6 +204,7 @@ public static List<StatisticsCollectJob> buildExternalStatisticsCollectJob(Exter
createExternalAnalyzeJob(statsJobs, externalAnalyzeJob, db, table, externalAnalyzeJob.getColumns());
}

statsJobs.sort(new StatisticsCollectJob.ComparatorWithPriority());
return statsJobs;
}

Expand Down Expand Up @@ -508,20 +516,24 @@ private static void createJob(List<StatisticsCollectJob> allTableJobMap, NativeA
table.getName(), basicStatsMeta.getUpdateTime(), healthy,
Config.statistic_auto_collect_sample_threshold, ByteSizeUnit.BYTES.toMB(sumDataSize),
ByteSizeUnit.BYTES.toMB(Config.statistic_auto_collect_small_table_size));
createSampleStatsJob(allTableJobMap, job, db, table, columnNames, columnTypes);
StatisticsCollectJob.Priority priority =
new StatisticsCollectJob.Priority(tableUpdateTime, statsUpdateTime, healthy);
createSampleStatsJob(allTableJobMap, job, db, table, columnNames, columnTypes, priority);
return;
}
}
}

StatisticsCollectJob.Priority priority =
new StatisticsCollectJob.Priority(tableUpdateTime, statsUpdateTime, healthy);
LOG.debug("statistics job work on un-health table: {}, healthy: {}, Type: {}", table.getName(), healthy,
job.getAnalyzeType());
if (job.getAnalyzeType().equals(StatsConstants.AnalyzeType.SAMPLE)) {
createSampleStatsJob(allTableJobMap, job, db, table, columnNames, columnTypes);
createSampleStatsJob(allTableJobMap, job, db, table, columnNames, columnTypes, priority);
} else if (job.getAnalyzeType().equals(StatsConstants.AnalyzeType.HISTOGRAM)) {
createHistogramJob(allTableJobMap, job, db, table, columnNames, columnTypes);
createHistogramJob(allTableJobMap, job, db, table, columnNames, columnTypes, priority);
} else if (job.getAnalyzeType().equals(StatsConstants.AnalyzeType.FULL)) {
createFullStatsJob(allTableJobMap, job, basicStatsMeta, db, table, columnNames, columnTypes);
createFullStatsJob(allTableJobMap, job, basicStatsMeta, db, table, columnNames, columnTypes, priority);
} else {
throw new StarRocksPlannerException("Unknown analyze type " + job.getAnalyzeType(),
ErrorType.INTERNAL_ERROR);
Expand All @@ -530,7 +542,7 @@ private static void createJob(List<StatisticsCollectJob> allTableJobMap, NativeA

private static void createSampleStatsJob(List<StatisticsCollectJob> allTableJobMap, NativeAnalyzeJob job,
Database db, Table table, List<String> columnNames,
List<Type> columnTypes) {
List<Type> columnTypes, StatisticsCollectJob.Priority priority) {
Collection<Partition> partitions = table.getPartitions();
BasicStatsMeta basicStatsMeta = GlobalStateMgr.getCurrentState().getAnalyzeMgr()
.getTableBasicStatsMeta(table.getId());
Expand All @@ -549,20 +561,23 @@ private static void createSampleStatsJob(List<StatisticsCollectJob> allTableJobM

StatisticsCollectJob sample = buildStatisticsCollectJob(db, table, partitionIdList, columnNames, columnTypes,
StatsConstants.AnalyzeType.SAMPLE, job.getScheduleType(), job.getProperties());
sample.setPriority(priority);
allTableJobMap.add(sample);
}

private static void createHistogramJob(List<StatisticsCollectJob> allTableJobMap, NativeAnalyzeJob job,
Database db, Table table, List<String> columnNames,
List<Type> columnTypes) {
List<Type> columnTypes, StatisticsCollectJob.Priority priority) {
StatisticsCollectJob sample = buildStatisticsCollectJob(db, table, null, columnNames, columnTypes,
StatsConstants.AnalyzeType.HISTOGRAM, job.getScheduleType(), job.getProperties());
sample.setPriority(priority);
allTableJobMap.add(sample);
}

private static void createFullStatsJob(List<StatisticsCollectJob> allTableJobMap,
NativeAnalyzeJob job, BasicStatsMeta stats,
Database db, Table table, List<String> columnNames, List<Type> columnTypes) {
Database db, Table table, List<String> columnNames, List<Type> columnTypes,
StatisticsCollectJob.Priority priority) {
StatsConstants.AnalyzeType analyzeType;
List<Partition> partitionList = table.getPartitions().stream()
.filter(partition -> !StatisticUtils.isPartitionStatsHealthy(table, partition, stats))
Expand All @@ -578,9 +593,11 @@ private static void createFullStatsJob(List<StatisticsCollectJob> allTableJobMap
}

if (!partitionList.isEmpty()) {
allTableJobMap.add(buildStatisticsCollectJob(db, table,
StatisticsCollectJob statisticsCollectJob = buildStatisticsCollectJob(db, table,
partitionList.stream().map(Partition::getId).collect(Collectors.toList()), columnNames, columnTypes,
analyzeType, job.getScheduleType(), Maps.newHashMap()));
analyzeType, job.getScheduleType(), Maps.newHashMap());
statisticsCollectJob.setPriority(priority);
allTableJobMap.add(statisticsCollectJob);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1583,4 +1583,30 @@ public void testGetCollectibleColumns2() throws Exception {
Assert.assertTrue(cols.size() == 2);
starRocksAssert.dropTable("test.t_gen_col");
}

@Test
public void testPriorityComparison() {
// Test case with different health values
StatisticsCollectJob.Priority priority1 =
new StatisticsCollectJob.Priority(LocalDateTime.now(), LocalDateTime.now(), 0.5);
StatisticsCollectJob.Priority priority2 =
new StatisticsCollectJob.Priority(LocalDateTime.now(), LocalDateTime.now(), 0.6);
Assert.assertTrue(priority1.compareTo(priority2) < 0);

// Test case with different staleness values
LocalDateTime now = LocalDateTime.now();
StatisticsCollectJob.Priority priority3 = new StatisticsCollectJob.Priority(now, now.minusSeconds(100), 0.5);
StatisticsCollectJob.Priority priority4 = new StatisticsCollectJob.Priority(now, now.minusSeconds(50), 0.5);
Assert.assertTrue(priority3.compareTo(priority4) < 0);

// Test case with both different health and staleness values
StatisticsCollectJob.Priority priority5 = new StatisticsCollectJob.Priority(now, now.minusSeconds(100), 0.5);
StatisticsCollectJob.Priority priority6 = new StatisticsCollectJob.Priority(now, now.minusSeconds(50), 0.6);
Assert.assertTrue(priority5.compareTo(priority6) < 0);

// Test case with statsUpdateTime set to LocalDateTime.MIN
StatisticsCollectJob.Priority priority7 = new StatisticsCollectJob.Priority(now, LocalDateTime.MIN, 0.5);
StatisticsCollectJob.Priority priority8 = new StatisticsCollectJob.Priority(now, now.minusSeconds(10), 0.5);
Assert.assertTrue(priority7.compareTo(priority8) < 0);
}
}

0 comments on commit 4f8f233

Please sign in to comment.