diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 35b8ab838dfb344..f46e38972bb9492 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -19,8 +19,6 @@ import org.apache.doris.common.ExperimentalUtil.ExperimentalType; -import java.util.concurrent.TimeUnit; - public class Config extends ConfigBase { @ConfField(description = {"用户自定义配置文件的路径,用于存放 fe_custom.conf。该文件中的配置会覆盖 fe.conf 中的配置", @@ -1742,7 +1740,7 @@ public class Config extends ConfigBase { * Used to determined how many statistics collection SQL could run simultaneously. */ @ConfField - public static int statistics_simultaneously_running_task_num = 10; + public static int statistics_simultaneously_running_task_num = 3; /** * if table has too many replicas, Fe occur oom when schema change. @@ -2043,7 +2041,7 @@ public class Config extends ConfigBase { * FE OOM. */ @ConfField - public static long stats_cache_size = 10_0000; + public static long stats_cache_size = 50_0000; /** * This configuration is used to enable the statistics of query information, which will record @@ -2066,9 +2064,6 @@ public class Config extends ConfigBase { "Whether to enable binlog feature"}) public static boolean enable_feature_binlog = false; - @ConfField - public static int analyze_task_timeout_in_hours = 12; - @ConfField(mutable = true, masterOnly = true, description = { "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。", "Whether to disable creating catalog with WITH RESOURCE statement."}) @@ -2123,9 +2118,6 @@ public class Config extends ConfigBase { @ConfField public static boolean forbid_running_alter_job = false; - @ConfField - public static int table_stats_health_threshold = 80; - @ConfField(description = { "暂时性配置项,开启后会自动将所有的olap表修改为可light schema change", "temporary config filed, will make all olap tables enable light schema change" @@ -2151,28 +2143,6 @@ public class Config extends ConfigBase { + "but it will increase the memory overhead."}) public static int virtual_node_number = 2048; - @ConfField(description = {"控制对大表的自动ANALYZE的最小时间间隔," - + "在该时间间隔内大小超过huge_table_lower_bound_size_in_bytes的表仅ANALYZE一次", - "This controls the minimum time interval for automatic ANALYZE on large tables. Within this interval," - + "tables larger than huge_table_lower_bound_size_in_bytes are analyzed only once."}) - public static long huge_table_auto_analyze_interval_in_millis = TimeUnit.HOURS.toMillis(12); - - @ConfField(description = {"定义大表的大小下界,在开启enable_auto_sample的情况下," - + "大小超过该值的表将会自动通过采样收集统计信息", "This defines the lower size bound for large tables. " - + "When enable_auto_sample is enabled, tables larger than this value will automatically collect " - + "statistics through sampling"}) - public static long huge_table_lower_bound_size_in_bytes = 5L * 1024 * 1024 * 1024; - - @ConfField(description = {"定义开启开启大表自动sample后,对大表的采样比例", - "This defines the number of sample percent for large tables when automatic sampling for" - + "large tables is enabled"}) - public static int huge_table_default_sample_rows = 4194304; - - @ConfField(description = {"是否开启大表自动sample,开启后对于大小超过huge_table_lower_bound_size_in_bytes会自动通过采样收集" - + "统计信息", "Whether to enable automatic sampling for large tables, which, when enabled, automatically" - + "collects statistics through sampling for tables larger than 'huge_table_lower_bound_size_in_bytes'"}) - public static boolean enable_auto_sample = false; - @ConfField(description = { "控制统计信息的自动触发作业执行记录的持久化行数", "Determine the persist number of automatic triggered analyze job execution status" diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 0f6ffc3cf6b813d..576fda217e6b3c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -53,7 +53,6 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.HistogramTask; -import org.apache.doris.statistics.MVAnalysisTask; import org.apache.doris.statistics.OlapAnalysisTask; import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.statistics.util.StatisticsUtil; @@ -1102,11 +1101,9 @@ public TTableDescriptor toThrift() { public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { if (info.analysisType.equals(AnalysisType.HISTOGRAM)) { return new HistogramTask(info); - } - if (info.analysisType.equals(AnalysisType.FUNDAMENTALS)) { + } else { return new OlapAnalysisTask(info); } - return new MVAnalysisTask(info); } public boolean needReAnalyzeTable(TableStatsMeta tblStats) { @@ -1126,7 +1123,7 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) { } long updateRows = tblStats.updatedRows.get(); int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows); - return tblHealth < Config.table_stats_health_threshold; + return tblHealth < StatisticsUtil.getTableStatsHealthThreshold(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index bf45e128d8c833b..4f6269484070775 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -571,10 +571,15 @@ private Statistics computeFilter(Filter filter) { } private ColumnStatistic getColumnStatistic(TableIf table, String colName) { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext != null && connectContext.getSessionVariable().internalSession) { + return ColumnStatistic.UNKNOWN; + } if (totalColumnStatisticMap.get(table.getName() + colName) != null) { return totalColumnStatisticMap.get(table.getName() + colName); } else if (isPlayNereidsDump) { return ColumnStatistic.UNKNOWN; + } else { long catalogId; long dbId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7425c97d7bbafee..fd2f14b827debec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * System variable. @@ -410,6 +411,19 @@ public class SessionVariable implements Serializable, Writable { public static final String FASTER_FLOAT_CONVERT = "faster_float_convert"; + public static final String ENABLE_DECIMAL256 = "enable_decimal256"; + + public static final String STATS_INSERT_MERGE_ITEM_COUNT = "stats_insert_merge_item_count"; + + public static final String HUGE_TABLE_DEFAULT_SAMPLE_ROWS = "huge_table_default_sample_rows"; + public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = "huge_table_lower_bound_size_in_bytes"; + + public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS + = "huge_table_auto_analyze_interval_in_millis"; + + public static final String TABLE_STATS_HEALTH_THRESHOLD + = "table_stats_health_threshold"; + public static final List DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -463,7 +477,7 @@ public class SessionVariable implements Serializable, Writable { public int queryTimeoutS = 900; // query timeout in second. - @VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, needForward = true) + @VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, flag = VariableMgr.GLOBAL, needForward = true) public int analyzeTimeoutS = 43200; // The global max_execution_time value provides the default for the session value for new connections. @@ -1150,6 +1164,22 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) { + " use a skiplist to optimize the intersection."}) public int invertedIndexConjunctionOptThreshold = 1000; + @VariableMgr.VarAttr(name = FULL_AUTO_ANALYZE_START_TIME, needForward = true, checker = "checkAnalyzeTimeFormat", + description = {"该参数定义自动ANALYZE例程的开始时间", + "This parameter defines the start time for the automatic ANALYZE routine."}, + flag = VariableMgr.GLOBAL) + public String fullAutoAnalyzeStartTime = "00:00:00"; + + @VariableMgr.VarAttr(name = FULL_AUTO_ANALYZE_END_TIME, needForward = true, checker = "checkAnalyzeTimeFormat", + description = {"该参数定义自动ANALYZE例程的结束时间", + "This parameter defines the end time for the automatic ANALYZE routine."}, + flag = VariableMgr.GLOBAL) + public String fullAutoAnalyzeEndTime = "23:59:59"; + + @VariableMgr.VarAttr(name = SQL_DIALECT, needForward = true, checker = "checkSqlDialect", + description = {"解析sql使用的方言", "The dialect used to parse sql."}) + public String sqlDialect = "doris"; + @VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = true) public boolean enableUniqueKeyPartialUpdate = false; @@ -1186,6 +1216,48 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) { "the runtime filter id in IGNORE_RUNTIME_FILTER_IDS list will not be generated"}) public String ignoreRuntimeFilterIds = ""; + + @VariableMgr.VarAttr(name = STATS_INSERT_MERGE_ITEM_COUNT, flag = VariableMgr.GLOBAL, description = { + "控制统计信息相关INSERT攒批数量", "Controls the batch size for stats INSERT merging." + } + ) + public int statsInsertMergeItemCount = 200; + + @VariableMgr.VarAttr(name = HUGE_TABLE_DEFAULT_SAMPLE_ROWS, flag = VariableMgr.GLOBAL, description = { + "定义开启开启大表自动sample后,对大表的采样比例", + "This defines the number of sample percent for large tables when automatic sampling for" + + "large tables is enabled" + + }) + public long hugeTableDefaultSampleRows = 4194304; + + + @VariableMgr.VarAttr(name = HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES, flag = VariableMgr.GLOBAL, + description = { + "大小超过该值的表将会自动通过采样收集统计信息", + "This defines the lower size bound for large tables. " + + "When enable_auto_sample is enabled, tables" + + "larger than this value will automatically collect " + + "statistics through sampling"}) + public long hugeTableLowerBoundSizeInBytes = 5L * 1024 * 1024 * 1024; + + @VariableMgr.VarAttr(name = HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, flag = VariableMgr.GLOBAL, + description = {"控制对大表的自动ANALYZE的最小时间间隔," + + "在该时间间隔内大小超过huge_table_lower_bound_size_in_bytes的表仅ANALYZE一次", + "This controls the minimum time interval for automatic ANALYZE on large tables." + + "Within this interval," + + "tables larger than huge_table_lower_bound_size_in_bytes are analyzed only once."}) + public long hugeTableAutoAnalyzeIntervalInMillis = TimeUnit.HOURS.toMillis(12); + + @VariableMgr.VarAttr(name = TABLE_STATS_HEALTH_THRESHOLD, flag = VariableMgr.GLOBAL, + description = {"取值在0-100之间,当自上次统计信息收集操作之后" + + "数据更新量达到 (100 - table_stats_health_threshold)% ,认为该表的统计信息已过时", + "The value should be between 0 and 100. When the data update quantity " + + "exceeds (100 - table_stats_health_threshold)% since the last " + + "statistics collection operation, the statistics for this table are" + + "considered outdated."}) + public int tableStatsHealthThreshold = 60; + public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids"; public Set getIgnoredRuntimeFilterIds() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java new file mode 100644 index 000000000000000..904dc21e337732d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.qe.AuditLogHelper; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.StringJoiner; + +public class AnalysisJob { + + public static final Logger LOG = LogManager.getLogger(AnalysisJob.class); + + protected Set queryingTask; + + protected Set queryFinished; + + protected List buf; + + protected int totalTaskCount; + + protected int queryFinishedTaskCount; + + protected StmtExecutor stmtExecutor; + + protected boolean killed; + + protected long start; + + protected AnalysisInfo jobInfo; + + protected AnalysisManager analysisManager; + + public AnalysisJob(AnalysisInfo jobInfo, Collection queryingTask) { + for (BaseAnalysisTask task : queryingTask) { + task.job = this; + } + this.queryingTask = new HashSet<>(queryingTask); + this.queryFinished = new HashSet<>(); + this.buf = new ArrayList<>(); + totalTaskCount = queryingTask.size(); + start = System.currentTimeMillis(); + this.jobInfo = jobInfo; + this.analysisManager = Env.getCurrentEnv().getAnalysisManager(); + } + + public synchronized void appendBuf(BaseAnalysisTask task, List statsData) { + queryingTask.remove(task); + buf.addAll(statsData); + queryFinished.add(task); + queryFinishedTaskCount += 1; + if (queryFinishedTaskCount == totalTaskCount) { + writeBuf(); + updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " + + (System.currentTimeMillis() - start) / 1000); + deregisterJob(); + } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) { + writeBuf(); + } + } + + // CHECKSTYLE OFF + // fallthrough here is expected + public void updateTaskState(AnalysisState state, String msg) { + long time = System.currentTimeMillis(); + switch (state) { + case FAILED: + for (BaseAnalysisTask task : queryingTask) { + analysisManager.updateTaskStatus(task.info, state, msg, time); + task.cancel(); + } + killed = true; + case FINISHED: + for (BaseAnalysisTask task : queryFinished) { + analysisManager.updateTaskStatus(task.info, state, msg, time); + } + default: + // DO NOTHING + } + } + + protected void writeBuf() { + if (killed) { + return; + } + // buf could be empty when nothing need to do, for example user submit an analysis task for table with no data + // change + if (!buf.isEmpty()) { + String insertStmt = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME + " VALUES "; + StringJoiner values = new StringJoiner(","); + for (ColStatsData data : buf) { + values.add(data.toSQL(true)); + } + insertStmt += values.toString(); + int retryTimes = 0; + while (retryTimes < StatisticConstants.ANALYZE_TASK_RETRY_TIMES) { + if (killed) { + return; + } + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { + stmtExecutor = new StmtExecutor(r.connectContext, insertStmt); + executeWithExceptionOnFail(stmtExecutor); + break; + } catch (Exception t) { + LOG.warn("Failed to write buf: " + insertStmt, t); + retryTimes++; + if (retryTimes >= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) { + updateTaskState(AnalysisState.FAILED, t.getMessage()); + return; + } + } + } + } + updateTaskState(AnalysisState.FINISHED, ""); + syncLoadStats(); + queryFinished.clear(); + } + + protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { + if (killed) { + return; + } + LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt()); + try { + stmtExecutor.execute(); + QueryState queryState = stmtExecutor.getContext().getState(); + if (queryState.getStateType().equals(MysqlStateType.ERR)) { + throw new RuntimeException( + "Failed to insert : " + stmtExecutor.getOriginStmt().originStmt + "Error msg: " + + queryState.getErrorMessage()); + } + } finally { + AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(), + stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), + true); + } + } + + public void taskFailed(BaseAnalysisTask task, String reason) { + updateTaskState(AnalysisState.FAILED, reason); + cancel(); + deregisterJob(); + } + + public void cancel() { + for (BaseAnalysisTask task : queryingTask) { + task.cancel(); + } + } + + public void deregisterJob() { + analysisManager.removeJob(jobInfo.jobId); + } + + protected void syncLoadStats() { + long tblId = jobInfo.tblId; + for (BaseAnalysisTask task : queryFinished) { + String colName = task.col.getName(); + if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) { + analysisManager.removeColStatsStatus(tblId, colName); + } + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index b477a23680e2885..83441f4b2dd15b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -42,7 +42,6 @@ import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -101,7 +100,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -public class AnalysisManager extends Daemon implements Writable { +public class AnalysisManager implements Writable { private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); @@ -113,11 +112,11 @@ public class AnalysisManager extends Daemon implements Writable { private AnalysisTaskExecutor taskExecutor; // Store task information in metadata. - private final NavigableMap analysisTaskInfoMap = + protected final NavigableMap analysisTaskInfoMap = Collections.synchronizedNavigableMap(new TreeMap<>()); // Store job information in metadata. - private final NavigableMap analysisJobInfoMap = + protected final NavigableMap analysisJobInfoMap = Collections.synchronizedNavigableMap(new TreeMap<>()); // Tracking system submitted job, keep in mem only @@ -128,6 +127,8 @@ public class AnalysisManager extends Daemon implements Writable { private final Map idToTblStats = new ConcurrentHashMap<>(); + private final Map idToAnalysisJob = new ConcurrentHashMap<>(); + protected SimpleQueue autoJobs = createSimpleQueue(null, this); private final Function userJobStatusUpdater = w -> { @@ -237,7 +238,6 @@ public class AnalysisManager extends Daemon implements Writable { new Function[] {userJobStatusUpdater, systemJobStatusUpdater}; public AnalysisManager() { - super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS)); if (!Env.isCheckpointThread()) { this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num); this.statisticsCache = new StatisticsCache(); @@ -245,44 +245,6 @@ public AnalysisManager() { } } - @Override - protected void runOneCycle() { - clear(); - } - - private void clear() { - clearExpiredAnalysisInfo(analysisJobInfoMap, (a) -> - a.scheduleType.equals(ScheduleType.ONCE) - && System.currentTimeMillis() - a.lastExecTimeInMs - > TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS), - (id) -> { - Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new AnalyzeDeletionLog(id)); - return null; - }); - clearExpiredAnalysisInfo(analysisTaskInfoMap, (a) -> System.currentTimeMillis() - a.lastExecTimeInMs - > TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS), - (id) -> { - Env.getCurrentEnv().getEditLog().logDeleteAnalysisTask(new AnalyzeDeletionLog(id)); - return null; - }); - } - - private void clearExpiredAnalysisInfo(Map infoMap, Predicate isExpired, - Function writeLog) { - synchronized (infoMap) { - List expired = new ArrayList<>(); - for (Entry entry : infoMap.entrySet()) { - if (isExpired.test(entry.getValue())) { - expired.add(entry.getKey()); - } - } - for (Long k : expired) { - infoMap.remove(k); - writeLog.apply(k); - } - } - } - public StatisticsCache getStatisticsCache() { return statisticsCache; } @@ -371,6 +333,7 @@ protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlExceptio boolean isSync = stmt.isSync(); Map analysisTaskInfos = new HashMap<>(); createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync); + constructJob(jobInfo, analysisTaskInfos.values()); if (!jobInfo.partitionOnly && stmt.isAllColumns() && StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync); @@ -446,7 +409,6 @@ private void sendJobId(List analysisInfos, boolean proxy) { */ private Map> validateAndGetPartitions(TableIf table, Set columnNames, Set partitionNames, AnalysisType analysisType) throws DdlException { - long tableId = table.getId(); Map> columnToPartitions = columnNames.stream() .collect(Collectors.toMap( @@ -467,27 +429,6 @@ private Map> validateAndGetPartitions(TableIf table, Set> existColAndPartsForStats = StatisticsRepository - .fetchColAndPartsForStats(tableId); - - if (existColAndPartsForStats.isEmpty()) { - // There is no historical statistical information, no need to do validation - return columnToPartitions; - } - - Set existPartIdsForStats = new HashSet<>(); - existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll); - Set idToPartition = StatisticsUtil.getPartitionIds(table); - // Get an invalid set of partitions (those partitions were deleted) - Set invalidPartIds = existPartIdsForStats.stream() - .filter(id -> !idToPartition.contains(id)).collect(Collectors.toSet()); - - if (!invalidPartIds.isEmpty()) { - // Delete invalid partition statistics to avoid affecting table statistics - StatisticsRepository.dropStatistics(invalidPartIds); - } - if (analysisType == AnalysisType.FUNDAMENTALS) { Map> result = table.findReAnalyzeNeededPartitions(); result.keySet().retainAll(columnNames); @@ -720,11 +661,12 @@ public String getJobProgress(long jobId) { public void syncExecute(Collection tasks) { SyncTaskCollection syncTaskCollection = new SyncTaskCollection(tasks); ConnectContext ctx = ConnectContext.get(); + ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze(); try { ctxToSyncTask.put(ctx, syncTaskCollection); - ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze(); syncTaskCollection.execute(syncExecPool); } finally { + syncExecPool.shutdown(); ctxToSyncTask.remove(ctx); } } @@ -737,7 +679,7 @@ private ThreadPoolExecutor createThreadPoolForSyncAnalyze() { new SynchronousQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC ANALYZE" + "-%d") .build(), new BlockedPolicy(poolName, - (int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours))); + StatisticsUtil.getAnalyzeTimeout())); } public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException { @@ -759,6 +701,7 @@ public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException { for (String col : cols) { Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col); } + tableStats.updatedTime = 0; } logCreateTableStats(tableStats); StatisticsRepository.dropStatistics(tblId, cols); @@ -1128,4 +1071,17 @@ public ColStatsMeta findColStatsMeta(long tblId, String colName) { } return tableStats.findColumnStatsMeta(colName); } + + public AnalysisJob findJob(long id) { + return idToAnalysisJob.get(id); + } + + public void constructJob(AnalysisInfo jobInfo, Collection tasks) { + AnalysisJob job = new AnalysisJob(jobInfo, tasks); + idToAnalysisJob.put(jobInfo.jobId, job); + } + + public void removeJob(long id) { + idToAnalysisJob.remove(id); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 4b133ce0ebfc686..58bae9fe66b5cae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -18,9 +18,9 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; -import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; +import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,7 +36,7 @@ public class AnalysisTaskExecutor extends Thread { private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class); - private final ThreadPoolExecutor executors; + protected final ThreadPoolExecutor executors; private final BlockingQueue taskQueue = new PriorityBlockingQueue(20, @@ -72,18 +72,22 @@ private void cancelExpiredTask() { private void doCancelExpiredJob() { for (;;) { + tryToCancel(); + } + } + + protected void tryToCancel() { + try { + AnalysisTaskWrapper taskWrapper = taskQueue.take(); try { - AnalysisTaskWrapper taskWrapper = taskQueue.take(); - try { - long timeout = TimeUnit.HOURS.toMillis(Config.analyze_task_timeout_in_hours) - - (System.currentTimeMillis() - taskWrapper.getStartTime()); - taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); - } catch (Exception e) { - taskWrapper.cancel(e.getMessage()); - } - } catch (Throwable throwable) { - LOG.warn(throwable); + long timeout = TimeUnit.SECONDS.toMillis(StatisticsUtil.getAnalyzeTimeout()) + - (System.currentTimeMillis() - taskWrapper.getStartTime()); + taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); + } catch (Exception e) { + taskWrapper.cancel(e.getMessage()); } + } catch (Throwable throwable) { + LOG.warn("cancel analysis task failed", throwable); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java index 9aa3d85992b32c0..ffdd375ee9e36d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java @@ -17,7 +17,6 @@ package org.apache.doris.statistics; -import org.apache.doris.catalog.Env; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; @@ -59,9 +58,8 @@ public void run() { if (task.info.scheduleType.equals(ScheduleType.AUTOMATIC) && !StatisticsUtil.inAnalyzeTime( LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) { // TODO: Do we need a separate AnalysisState here? - Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, AnalysisState.FAILED, "Auto task" - + "doesn't get executed within specified time range", System.currentTimeMillis()); + task.job.taskFailed(task, "Auto task" + + "doesn't get executed within specified time range"); return; } executor.putJob(this); @@ -76,15 +74,7 @@ public void run() { if (!task.killed) { if (except != null) { LOG.warn("Analyze {} failed.", task.toString(), except); - Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, - AnalysisState.FAILED, Util.getRootCauseMessage(except), System.currentTimeMillis()); - } else { - LOG.debug("Analyze {} finished, cost time:{}", task.toString(), - System.currentTimeMillis() - startTime); - Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, - AnalysisState.FINISHED, "", System.currentTimeMillis()); + task.job.taskFailed(task, Util.getRootCauseMessage(except)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 4f7d588de735f7e..3fcebd6c38b5043 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -22,14 +22,12 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.Config; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.qe.AuditLogHelper; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.util.DBObjects; import org.apache.doris.statistics.util.StatisticsUtil; @@ -38,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; import java.util.concurrent.TimeUnit; public abstract class BaseAnalysisTask { @@ -52,59 +51,25 @@ public abstract class BaseAnalysisTask { + "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, " ; - /** - * Stats stored in the column_statistics table basically has two types, `part_id` is null which means it is - * aggregate from partition level stats, `part_id` is not null which means it is partition level stats. - * For latter, it's id field contains part id, for previous doesn't. - */ - protected static final String INSERT_PART_STATISTICS = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(`${colName}`) AS ndv, " - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "MIN(`${colName}`) AS min, " - + "MAX(`${colName}`) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW() "; - - protected static final String INSERT_COL_STATISTICS = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT id, catalog_id, db_id, tbl_id, idx_id, col_id, part_id, row_count, " - + " ndv, null_count," - + " to_base64(CAST(min AS string)), to_base64(CAST(max AS string)), data_size, update_time\n" - + " FROM \n" - + " (SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + protected static final String COLLECT_COL_STATISTICS = + "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + " ${catalogId} AS catalog_id, " + " ${dbId} AS db_id, " + " ${tblId} AS tbl_id, " + " ${idxId} AS idx_id, " + " '${colId}' AS col_id, " + " NULL AS part_id, " - + " SUM(count) AS row_count, \n" - + " SUM(null_count) AS null_count, " - + " MIN(CAST(from_base64(min) AS ${type})) AS min, " - + " MAX(CAST(from_base64(max) AS ${type})) AS max, " - + " SUM(data_size_in_bytes) AS data_size, " - + " NOW() AS update_time \n" - + " FROM ${internalDB}.${columnStatTbl}" - + " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND " - + " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND " - + " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND " - + " ${internalDB}.${columnStatTbl}.idx_id='${idxId}' AND " - + " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL" - + " ) t1, \n"; - - protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " + + " COUNT(1) AS row_count, " + + " NDV(`${colName}`) AS ndv, " + + " COUNT(1) - COUNT(${colName}) AS null_count, " + + " CAST(MIN(${colName}) AS STRING) AS min, " + + " CAST(MAX(${colName}) AS STRING) AS max, " + + " ${dataSizeFunction} AS data_size, " + + " NOW() AS update_time " + + " FROM `${dbName}`.`${tblName}`"; + + protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = + " SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + "${catalogId} AS catalog_id, " + "${dbId} AS db_id, " @@ -115,8 +80,8 @@ public abstract class BaseAnalysisTask { + "${row_count} AS row_count, " + "${ndv} AS ndv, " + "${null_count} AS null_count, " - + "to_base64('${min}') AS min, " - + "to_base64('${max}') AS max, " + + "'${min}' AS min, " + + "'${max}' AS max, " + "${data_size} AS data_size, " + "NOW() "; @@ -136,6 +101,8 @@ public abstract class BaseAnalysisTask { protected TableSample tableSample = null; + protected AnalysisJob job; + @VisibleForTesting public BaseAnalysisTask() { @@ -192,6 +159,7 @@ protected void executeWithRetry() { } LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t); if (retriedTimes > StatisticConstants.ANALYZE_TASK_RETRY_TIMES) { + job.taskFailed(this, t.getMessage()); throw new RuntimeException(t); } StatisticsUtil.sleep(TimeUnit.SECONDS.toMillis(2 ^ retriedTimes) * 10); @@ -266,11 +234,10 @@ protected TableSample getTableSample() { return new TableSample(true, (long) info.samplePercent); } else if (info.sampleRows > 0) { return new TableSample(false, info.sampleRows); - } else if (info.analysisMethod == AnalysisMethod.FULL - && Config.enable_auto_sample - && tbl.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes) { + } else if (info.jobType.equals(JobType.SYSTEM) && info.analysisMethod == AnalysisMethod.FULL + && tbl.getDataSize(true) > StatisticsUtil.getHugeTableLowerBoundSizeInBytes()) { // If user doesn't specify sample percent/rows, use auto sample and update sample rows in analysis info. - return new TableSample(false, (long) Config.huge_table_default_sample_rows); + return new TableSample(false, StatisticsUtil.getHugeTableSampleRows()); } else { return null; } @@ -283,23 +250,20 @@ public String toString() { col == null ? "TableRowCount" : col.getName()); } - protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { - if (killed) { - return; - } - LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt()); - try { - stmtExecutor.execute(); - QueryState queryState = stmtExecutor.getContext().getState(); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - catalog.getName(), db.getFullName(), info.colName, stmtExecutor.getOriginStmt().toString(), - queryState.getErrorMessage())); - } + public void setJob(AnalysisJob job) { + this.job = job; + } + + protected void runQuery(String sql) { + long startTime = System.currentTimeMillis(); + try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) { + stmtExecutor = new StmtExecutor(a.connectContext, sql); + stmtExecutor.executeInternalQuery(); + ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); + job.appendBuf(this, Collections.singletonList(colStatsData)); } finally { - AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(), - stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), - true); + LOG.debug("End cost time in secs: " + (System.currentTimeMillis() - startTime) / 1000); } } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java index 6c94326a9424eca..41936232afdfed1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java @@ -19,6 +19,8 @@ import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.annotations.VisibleForTesting; + import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.StringJoiner; @@ -54,6 +56,18 @@ public class ColStatsData { public final String updateTime; + @VisibleForTesting + public ColStatsData() { + statsId = new StatsId(); + count = 0; + ndv = 0; + nullCount = 0; + minLit = null; + maxLit = null; + dataSizeInBytes = 0; + updateTime = null; + } + public ColStatsData(ResultRow row) { this.statsId = new StatsId(row); this.count = (long) Double.parseDouble(row.get(7)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 4583237f8c61f55..049e80d52fd9a97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -23,26 +23,19 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.external.hive.util.HiveUtil; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.StatisticsUtil; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.StringJoiner; import java.util.stream.Collectors; public class HMSAnalysisTask extends BaseAnalysisTask { @@ -51,9 +44,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { // While doing sample analysis, the sampled ndv result will multiply a factor (total size/sample size) // if ndv(col)/count(col) is greater than this threshold. - private static final String ANALYZE_TABLE_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " + private static final String ANALYZE_TABLE_TEMPLATE = " SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + "${catalogId} AS catalog_id, " + "${dbId} AS db_id, " @@ -70,28 +61,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask { + "NOW() " + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}"; - private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(`${colName}`) AS ndv, " - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "to_base64(MIN(`${colName}`)) AS min, " - + "to_base64(MAX(`${colName}`)) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where "; - private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount " + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}"; - // cache stats for each partition, it would be inserted into column_statistics in a batch. - private final List> buf = new ArrayList<>(); - private final boolean isTableLevelTask; private final boolean isPartitionOnly; private Set partitionNames; @@ -131,25 +103,16 @@ private void getTableStats() throws Exception { * Get column statistics and insert the result to __internal_schema.column_statistics */ private void getTableColumnStats() throws Exception { - if (isPartitionOnly) { - getPartitionNames(); - List partitionAnalysisSQLs = new ArrayList<>(); - for (String partId : this.partitionNames) { - partitionAnalysisSQLs.add(generateSqlForPartition(partId)); - } - execSQLs(partitionAnalysisSQLs); - } else { - if (!info.usingSqlForPartitionColumn && isPartitionColumn()) { - try { - getPartitionColumnStats(); - } catch (Exception e) { - LOG.warn("Failed to collect stats for partition col {} using metadata, " - + "fallback to normal collection", col.getName(), e); - getOrdinaryColumnStats(); - } - } else { + if (!info.usingSqlForPartitionColumn && isPartitionColumn()) { + try { + getPartitionColumnStats(); + } catch (Exception e) { + LOG.warn("Failed to collect stats for partition col {} using metadata, " + + "fallback to normal collection", col.getName(), e); getOrdinaryColumnStats(); } + } else { + getOrdinaryColumnStats(); } } @@ -182,7 +145,7 @@ private void getOrdinaryColumnStats() throws Exception { params.put("maxFunction", getMaxFunction()); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(sb.toString()); - executeInsertSql(sql); + runQuery(sql); } private void getPartitionColumnStats() throws Exception { @@ -227,7 +190,7 @@ private void getPartitionColumnStats() throws Exception { params.put("data_size", String.valueOf(dataSize)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE); - executeInsertSql(sql); + runQuery(sql); } private String updateMinValue(String currentMin, String value) { @@ -278,7 +241,7 @@ private void getPartitionNames() { partitionNames = table.getPartitionNames(); } else if (info.partitionCount > 0) { partitionNames = table.getPartitionNames().stream() - .limit(info.partitionCount).collect(Collectors.toSet()); + .limit(info.partitionCount).collect(Collectors.toSet()); } if (partitionNames == null || partitionNames.isEmpty()) { throw new RuntimeException("Not a partition table or no partition specified."); @@ -286,80 +249,6 @@ private void getPartitionNames() { } } - private String generateSqlForPartition(String partId) { - StringBuilder sb = new StringBuilder(); - sb.append(ANALYZE_PARTITION_TEMPLATE); - String[] splits = partId.split("/"); - for (int i = 0; i < splits.length; i++) { - String[] kv = splits[i].split("="); - sb.append(kv[0]); - sb.append("='"); - sb.append(kv[1]); - sb.append("'"); - if (i < splits.length - 1) { - sb.append(" and "); - } - } - Map params = buildStatsParams(partId); - params.put("dataSizeFunction", getDataSizeFunction(col)); - return new StringSubstitutor(params).replace(sb.toString()); - } - - public void execSQLs(List partitionAnalysisSQLs) throws Exception { - long startTime = System.currentTimeMillis(); - LOG.debug("analyze task {} start at {}", info.toString(), new Date()); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - List> sqlGroups = Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT); - for (List group : sqlGroups) { - if (killed) { - return; - } - StringJoiner partitionCollectSQL = new StringJoiner("UNION ALL"); - group.forEach(partitionCollectSQL::add); - stmtExecutor = new StmtExecutor(r.connectContext, partitionCollectSQL.toString()); - buf.add(stmtExecutor.executeInternalQuery() - .stream().map(ColStatsData::new).collect(Collectors.toList())); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - catalog.getName(), db.getFullName(), info.colName, partitionCollectSQL, - queryState.getErrorMessage())); - } - } - for (List colStatsDataList : buf) { - StringBuilder batchInsertSQL = - new StringBuilder("INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME - + " VALUES "); - StringJoiner sj = new StringJoiner(","); - colStatsDataList.forEach(c -> sj.add(c.toSQL(true))); - batchInsertSQL.append(sj); - stmtExecutor = new StmtExecutor(r.connectContext, batchInsertSQL.toString()); - executeWithExceptionOnFail(stmtExecutor); - } - } finally { - LOG.debug("analyze task {} end. cost {}ms", info, System.currentTimeMillis() - startTime); - } - - } - - private void executeInsertSql(String sql) throws Exception { - long startTime = System.currentTimeMillis(); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - r.connectContext.setExecutor(stmtExecutor); - this.stmtExecutor.execute(); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) { - LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]", - catalog.getName(), db.getFullName(), info.colName, sql, queryState.getErrorMessage())); - throw new RuntimeException(queryState.getErrorMessage()); - } - LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.", - catalog.getName(), db.getFullName(), info.colName, sql, (System.currentTimeMillis() - startTime))); - } - } - private Map buildStatsParams(String partId) { Map commonParams = new HashMap<>(); String id = StatisticsUtil.constructId(tbl.getId(), -1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java index 5ae66d292dc43db..649b075c673f5db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java @@ -20,25 +20,17 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.JdbcExternalTable; import org.apache.doris.common.FeConstants; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.commons.text.StringSubstitutor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.HashMap; import java.util.List; import java.util.Map; public class JdbcAnalysisTask extends BaseAnalysisTask { - private static final Logger LOG = LogManager.getLogger(JdbcAnalysisTask.class); - private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " + private static final String ANALYZE_SQL_TABLE_TEMPLATE = " SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + "${catalogId} AS catalog_id, " + "${dbId} AS db_id, " @@ -49,8 +41,8 @@ public class JdbcAnalysisTask extends BaseAnalysisTask { + "COUNT(1) AS row_count, " + "NDV(`${colName}`) AS ndv, " + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "to_base64(MIN(`${colName}`)) AS min, " - + "to_base64(MAX(`${colName}`)) AS max, " + + "MIN(`${colName}`) AS min, " + + "MAX(`${colName}`) AS max, " + "${dataSizeFunction} AS data_size, " + "NOW() " + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; @@ -117,25 +109,7 @@ private void getTableColumnStats() throws Exception { params.put("dataSizeFunction", getDataSizeFunction(col)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(sb.toString()); - executeInsertSql(sql); - } - - private void executeInsertSql(String sql) throws Exception { - long startTime = System.currentTimeMillis(); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - r.connectContext.setExecutor(stmtExecutor); - this.stmtExecutor.execute(); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) { - LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]", - catalog.getName(), db.getFullName(), info.colName, sql, queryState.getErrorMessage())); - throw new RuntimeException(queryState.getErrorMessage()); - } - LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.", - catalog.getName(), db.getFullName(), info.colName, sql, (System.currentTimeMillis() - startTime))); - } + runQuery(sql); } private Map buildTableStatsParams(String partId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java deleted file mode 100644 index 6a43c5092fa072c..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java +++ /dev/null @@ -1,152 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.analysis.CreateMaterializedViewStmt; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.FunctionCallExpr; -import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.analysis.SelectListItem; -import org.apache.doris.analysis.SelectStmt; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.SqlParser; -import org.apache.doris.analysis.SqlScanner; -import org.apache.doris.analysis.TableRef; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MaterializedIndexMeta; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.SqlParserUtils; -import org.apache.doris.statistics.util.StatisticsUtil; - -import com.google.common.base.Preconditions; - -import java.io.StringReader; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * Analysis for the materialized view, only gets constructed when the AnalyzeStmt is not set which - * columns to be analyzed. - * TODO: Supports multi-table mv - */ -public class MVAnalysisTask extends BaseAnalysisTask { - - private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS - + " FROM (${sql}) mv ${sampleExpr}"; - - private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS - + " (SELECT NDV(`${colName}`) AS ndv " - + " FROM (${sql}) mv) t2"; - - private MaterializedIndexMeta meta; - - private SelectStmt selectStmt; - - private OlapTable olapTable; - - public MVAnalysisTask(AnalysisInfo info) { - super(info); - init(); - } - - private void init() { - olapTable = (OlapTable) tbl; - meta = olapTable.getIndexMetaByIndexId(info.indexId); - Preconditions.checkState(meta != null); - String mvDef = meta.getDefineStmt().originStmt; - SqlScanner input = - new SqlScanner(new StringReader(mvDef), 0L); - SqlParser parser = new SqlParser(input); - CreateMaterializedViewStmt cmv = null; - try { - cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 0); - } catch (Exception e) { - throw new RuntimeException(e); - } - selectStmt = cmv.getSelectStmt(); - selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName()); - } - - @Override - public void doExecute() throws Exception { - for (Column column : meta.getSchema()) { - SelectStmt selectOne = (SelectStmt) selectStmt.clone(); - TableRef tableRef = selectOne.getTableRefs().get(0); - SelectListItem selectItem = selectOne.getSelectList().getItems() - .stream() - .filter(i -> isCorrespondingToColumn(i, column)) - .findFirst() - .get(); - selectItem.setAlias(column.getName()); - Map params = new HashMap<>(); - for (String partName : tbl.getPartitionNames()) { - PartitionNames partitionName = new PartitionNames(false, - Collections.singletonList(partName)); - tableRef.setPartitionNames(partitionName); - String sql = selectOne.toSql(); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("catalogId", String.valueOf(catalog.getId())); - params.put("dbId", String.valueOf(db.getId())); - params.put("tblId", String.valueOf(tbl.getId())); - params.put("idxId", String.valueOf(meta.getIndexId())); - String colName = column.getName(); - params.put("colId", colName); - String partId = olapTable.getPartition(partName) == null ? "NULL" : - String.valueOf(olapTable.getPartition(partName).getId()); - params.put("partId", partId); - params.put("dataSizeFunction", getDataSizeFunction(column)); - params.put("dbName", db.getFullName()); - params.put("colName", colName); - params.put("tblName", tbl.getName()); - params.put("sql", sql); - StatisticsUtil.execUpdate(ANALYZE_MV_PART, params); - } - params.remove("partId"); - params.remove("sampleExpr"); - params.put("type", column.getType().toString()); - StatisticsUtil.execUpdate(ANALYZE_MV_COL, params); - Env.getCurrentEnv().getStatisticsCache() - .refreshColStatsSync(meta.getIndexId(), meta.getIndexId(), column.getName()); - } - } - - // Based on the fact that materialized view create statement's select expr only contains basic SlotRef and - // AggregateFunction. - private boolean isCorrespondingToColumn(SelectListItem item, Column column) { - Expr expr = item.getExpr(); - if (expr instanceof SlotRef) { - SlotRef slotRef = (SlotRef) expr; - return slotRef.getColumnName().equalsIgnoreCase(column.getName()); - } - if (expr instanceof FunctionCallExpr) { - FunctionCallExpr func = (FunctionCallExpr) expr; - SlotRef slotRef = (SlotRef) func.getChild(0); - return slotRef.getColumnName().equalsIgnoreCase(column.getName()); - } - return false; - } - - @Override - protected void afterExecution() { - // DO NOTHING - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 185a582cde436e7..b0c4b0b6c0e5c8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -22,28 +22,21 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import org.apache.commons.text.StringSubstitutor; import java.security.SecureRandom; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.StringJoiner; import java.util.stream.Collectors; /** @@ -51,29 +44,6 @@ */ public class OlapAnalysisTask extends BaseAnalysisTask { - // TODO Currently, NDV is computed for the full table; in fact, - // NDV should only be computed for the relevant partition. - private static final String ANALYZE_COLUMN_SQL_TEMPLATE = INSERT_COL_STATISTICS - + " (SELECT NDV(`${colName}`) AS ndv " - + " FROM `${dbName}`.`${tblName}`) t2"; - - private static final String COLLECT_PARTITION_STATS_SQL_TEMPLATE = - " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(`${colName}`) AS ndv, " - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "MIN(`${colName}`) AS min, " - + "MAX(`${colName}`) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW() FROM `${dbName}`.`${tblName}` PARTITION ${partitionName}"; - private static final String SAMPLE_COLUMN_SQL_TEMPLATE = "SELECT " + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + "${catalogId} AS catalog_id, " @@ -92,9 +62,6 @@ public class OlapAnalysisTask extends BaseAnalysisTask { + "FROM `${dbName}`.`${tblName}`" + "${tablets}"; - // cache stats for each partition, it would be inserted into column_statistics in a batch. - private final List> buf = new ArrayList<>(); - @VisibleForTesting public OlapAnalysisTask() { } @@ -148,45 +115,7 @@ protected void doSample() throws Exception { stmtExecutor = new StmtExecutor(r.connectContext, stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE)); // Scalar query only return one row ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); - OlapTable olapTable = (OlapTable) tbl; - Collection partitions = olapTable.getPartitions(); - int partitionCount = partitions.size(); - List values = partitions.stream().map(p -> String.format( - "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())", - StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName(), p.getId())), - InternalCatalog.INTERNAL_CATALOG_ID, - db.getId(), - tbl.getId(), - -1, - StatisticsUtil.quote(col.getName()), - p.getId(), - colStatsData.count / partitionCount, - colStatsData.ndv / partitionCount, - colStatsData.nullCount / partitionCount, - StatisticsUtil.quote(colStatsData.minLit), - StatisticsUtil.quote(colStatsData.maxLit), - colStatsData.dataSizeInBytes / partitionCount)).collect(Collectors.toList()); - values.add(String.format( - "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())", - StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName())), - InternalCatalog.INTERNAL_CATALOG_ID, - db.getId(), - tbl.getId(), - -1, - StatisticsUtil.quote(col.getName()), - "NULL", - colStatsData.count, - colStatsData.ndv, - colStatsData.nullCount, - StatisticsUtil.quote(colStatsData.minLit), - StatisticsUtil.quote(colStatsData.maxLit), - colStatsData.dataSizeInBytes)); - String insertSQL = "INSERT INTO " - + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME - + " VALUES " - + String.join(",", values); - stmtExecutor = new StmtExecutor(r.connectContext, insertSQL); - executeWithExceptionOnFail(stmtExecutor); + job.appendBuf(this, Collections.singletonList(colStatsData)); } } @@ -198,6 +127,7 @@ protected void doSample() throws Exception { protected void doFull() throws Exception { Set partitionNames = info.colToPartitions.get(info.colName); if (partitionNames.isEmpty()) { + job.appendBuf(this, Collections.emptyList()); return; } Map params = new HashMap<>(); @@ -212,68 +142,14 @@ protected void doFull() throws Exception { params.put("dbName", db.getFullName()); params.put("colName", String.valueOf(info.colName)); params.put("tblName", String.valueOf(tbl.getName())); - List partitionAnalysisSQLs = new ArrayList<>(); - try { - tbl.readLock(); - - for (String partitionName : partitionNames) { - Partition part = tbl.getPartition(partitionName); - if (part == null) { - continue; - } - params.put("partId", String.valueOf(tbl.getPartition(partitionName).getId())); - // Avoid error when get the default partition - params.put("partitionName", "`" + partitionName + "`"); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(COLLECT_PARTITION_STATS_SQL_TEMPLATE)); - } - } finally { - tbl.readUnlock(); - } - execSQLs(partitionAnalysisSQLs, params); + execSQL(params); } @VisibleForTesting - public void execSQLs(List partitionAnalysisSQLs, Map params) throws Exception { - long startTime = System.currentTimeMillis(); - LOG.debug("analyze task {} start at {}", info.toString(), new Date()); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) { - List> sqlGroups = Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT); - for (List group : sqlGroups) { - if (killed) { - return; - } - StringJoiner partitionCollectSQL = new StringJoiner("UNION ALL"); - group.forEach(partitionCollectSQL::add); - stmtExecutor = new StmtExecutor(r.connectContext, partitionCollectSQL.toString()); - buf.add(stmtExecutor.executeInternalQuery() - .stream().map(ColStatsData::new).collect(Collectors.toList())); - QueryState queryState = r.connectContext.getState(); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - catalog.getName(), db.getFullName(), info.colName, partitionCollectSQL, - queryState.getErrorMessage())); - } - } - for (List colStatsDataList : buf) { - StringBuilder batchInsertSQL = - new StringBuilder("INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME - + " VALUES "); - StringJoiner sj = new StringJoiner(","); - colStatsDataList.forEach(c -> sj.add(c.toSQL(true))); - batchInsertSQL.append(sj.toString()); - stmtExecutor = new StmtExecutor(r.connectContext, batchInsertSQL.toString()); - executeWithExceptionOnFail(stmtExecutor); - } - params.put("type", col.getType().toString()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); - stmtExecutor = new StmtExecutor(r.connectContext, sql); - executeWithExceptionOnFail(stmtExecutor); - } finally { - LOG.debug("analyze task {} end. cost {}ms", info, - System.currentTimeMillis() - startTime); - } + public void execSQL(Map params) throws Exception { + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String collectColStats = stringSubstitutor.replace(COLLECT_COL_STATISTICS); + runQuery(collectColStats); } // Get sample tablets id and scale up scaleFactor diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index e6b8297d0c0b013..f008c8fe3015ff1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -78,12 +78,20 @@ public class StatisticConstants { public static final int LOAD_RETRY_TIMES = 3; - // union more relation than 512 may cause StackOverFlowException in the future. - public static final int UNION_ALL_LIMIT = 512; - public static final String FULL_AUTO_ANALYZE_START_TIME = "00:00:00"; public static final String FULL_AUTO_ANALYZE_END_TIME = "23:59:59"; + public static final int INSERT_MERGE_ITEM_COUNT = 200; + + public static final long HUGE_TABLE_DEFAULT_SAMPLE_ROWS = 4194304; + public static final long HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = 5L * 1024 * 1024 * 1024; + + public static final long HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = TimeUnit.HOURS.toMillis(12); + + public static final int TABLE_STATS_HEALTH_THRESHOLD = 60; + + public static final int ANALYZE_TIMEOUT_IN_SEC = 43200; + static { SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER + ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 325065d6e261b33..32cf5cfb24b26e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -113,7 +113,7 @@ protected boolean skip(TableIf table) { if (!(table instanceof OlapTable || table instanceof ExternalTable)) { return true; } - if (table.getDataSize(true) < Config.huge_table_lower_bound_size_in_bytes) { + if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) { return false; } TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); @@ -121,12 +121,13 @@ protected boolean skip(TableIf table) { if (tableStats == null) { return false; } - return System.currentTimeMillis() - tableStats.updatedTime < Config.huge_table_auto_analyze_interval_in_millis; + return System.currentTimeMillis() + - tableStats.updatedTime < StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis(); } protected void createAnalyzeJobForTbl(DatabaseIf db, List analysisInfos, TableIf table) { - AnalysisMethod analysisMethod = table.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes + AnalysisMethod analysisMethod = table.getDataSize(true) > StatisticsUtil.getHugeTableLowerBoundSizeInBytes() ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; AnalysisInfo jobInfo = new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) @@ -141,7 +142,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf db, .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS) .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL) .setAnalysisMethod(analysisMethod) - .setSampleRows(Config.huge_table_default_sample_rows) + .setSampleRows(StatisticsUtil.getHugeTableSampleRows()) .setScheduleType(ScheduleType.AUTOMATIC) .setState(AnalysisState.PENDING) .setTaskIds(new ArrayList<>()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java index c2f1db6bc4a64cc..638db5539876111 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java @@ -73,14 +73,15 @@ protected void createSystemAnalysisJob(AnalysisInfo jobInfo) return; } - Map analysisTaskInfos = new HashMap<>(); + Map analysisTasks = new HashMap<>(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, false); + analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); + Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { - analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, false); + analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false); } - Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTaskInfos); - analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask); + Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); + analysisTasks.values().forEach(analysisTaskExecutor::submitTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java deleted file mode 100644 index f34ad0f1221de7f..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsPeriodCollector.java +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.Config; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class StatisticsPeriodCollector extends StatisticsCollector { - private static final Logger LOG = LogManager.getLogger(StatisticsPeriodCollector.class); - - public StatisticsPeriodCollector() { - super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2, - new AnalysisTaskExecutor(Config.period_analyze_simultaneously_running_task_num)); - } - - @Override - protected void collect() { - try { - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - List jobInfos = analysisManager.findPeriodicJobs(); - for (AnalysisInfo jobInfo : jobInfos) { - createSystemAnalysisJob(jobInfo); - } - } catch (Exception e) { - LOG.warn("Failed to periodically analyze the statistics." + e); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java index 3f9b2641b752240..7cd8817a1a487b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java @@ -19,6 +19,8 @@ import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.annotations.VisibleForTesting; + import java.util.StringJoiner; public class StatsId { @@ -34,6 +36,17 @@ public class StatsId { // nullable public final String partId; + @VisibleForTesting + public StatsId() { + this.id = null; + this.catalogId = -1; + this.dbId = -1; + this.tblId = -1; + this.idxId = -1; + this.colId = null; + this.partId = null; + } + public StatsId(ResultRow row) { this.id = row.get(0); this.catalogId = Long.parseLong(row.get(1)); @@ -52,7 +65,7 @@ public String toSQL() { sj.add(String.valueOf(tblId)); sj.add(String.valueOf(idxId)); sj.add(StatisticsUtil.quote(colId)); - sj.add(StatisticsUtil.quote(partId)); + sj.add(partId); return sj.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index cc0fb334a39ef98..931f22d7b023f06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -177,12 +177,14 @@ public static AutoCloseConnectContext buildConnectContext(boolean limitScan) { sessionVariable.enablePageCache = false; sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num; sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num; - sessionVariable.setEnableNereidsPlanner(false); + sessionVariable.setEnableNereidsPlanner(true); + sessionVariable.setEnablePipelineEngine(false); sessionVariable.enableProfile = false; sessionVariable.enableScanRunSerial = limitScan; - sessionVariable.queryTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60; - sessionVariable.insertTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60; + sessionVariable.queryTimeoutS = StatisticsUtil.getAnalyzeTimeout(); + sessionVariable.insertTimeoutS = StatisticsUtil.getAnalyzeTimeout(); sessionVariable.enableFileCache = false; + sessionVariable.forbidUnknownColStats = false; connectContext.setEnv(Env.getCurrentEnv()); connectContext.setDatabase(FeConstants.INTERNAL_DB_NAME); connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser()); @@ -807,7 +809,7 @@ public static boolean isExternalTable(long catalogId, long dbId, long tblId) { public static boolean inAnalyzeTime(LocalTime now) { try { - Pair range = findRangeFromGlobalSessionVar(); + Pair range = findConfigFromGlobalSessionVar(); if (range == null) { return false; } @@ -824,16 +826,16 @@ public static boolean inAnalyzeTime(LocalTime now) { } } - private static Pair findRangeFromGlobalSessionVar() { + private static Pair findConfigFromGlobalSessionVar() { try { String startTime = - findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME) + findConfigFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME) .fullAutoAnalyzeStartTime; // For compatibility if (StringUtils.isEmpty(startTime)) { startTime = StatisticConstants.FULL_AUTO_ANALYZE_START_TIME; } - String endTime = findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME) + String endTime = findConfigFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME) .fullAutoAnalyzeEndTime; if (StringUtils.isEmpty(startTime)) { endTime = StatisticConstants.FULL_AUTO_ANALYZE_END_TIME; @@ -845,7 +847,7 @@ private static Pair findRangeFromGlobalSessionVar() { } } - private static SessionVariable findRangeFromGlobalSessionVar(String varName) throws Exception { + protected static SessionVariable findConfigFromGlobalSessionVar(String varName) throws Exception { SessionVariable sessionVariable = VariableMgr.newSessionVariable(); VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL); VariableMgr.getValue(sessionVariable, variableExpr); @@ -854,10 +856,71 @@ private static SessionVariable findRangeFromGlobalSessionVar(String varName) thr public static boolean enableAutoAnalyze() { try { - return findRangeFromGlobalSessionVar(SessionVariable.ENABLE_FULL_AUTO_ANALYZE).enableFullAutoAnalyze; + return findConfigFromGlobalSessionVar(SessionVariable.ENABLE_FULL_AUTO_ANALYZE).enableFullAutoAnalyze; } catch (Exception e) { LOG.warn("Fail to get value of enable auto analyze, return false by default", e); } return false; } + + public static int getInsertMergeCount() { + try { + return findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT) + .statsInsertMergeItemCount; + } catch (Exception e) { + LOG.warn("Failed to get value of insert_merge_item_count, return default", e); + } + return StatisticConstants.INSERT_MERGE_ITEM_COUNT; + } + + public static long getHugeTableSampleRows() { + try { + return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_DEFAULT_SAMPLE_ROWS) + .hugeTableDefaultSampleRows; + } catch (Exception e) { + LOG.warn("Failed to get value of huge_table_default_sample_rows, return default", e); + } + return StatisticConstants.HUGE_TABLE_DEFAULT_SAMPLE_ROWS; + } + + public static long getHugeTableLowerBoundSizeInBytes() { + try { + return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES) + .hugeTableLowerBoundSizeInBytes; + } catch (Exception e) { + LOG.warn("Failed to get value of huge_table_lower_bound_size_in_bytes, return default", e); + } + return StatisticConstants.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES; + } + + public static long getHugeTableAutoAnalyzeIntervalInMillis() { + try { + return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS) + .hugeTableAutoAnalyzeIntervalInMillis; + } catch (Exception e) { + LOG.warn("Failed to get value of huge_table_auto_analyze_interval_in_millis, return default", e); + } + return StatisticConstants.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS; + } + + public static long getTableStatsHealthThreshold() { + try { + return findConfigFromGlobalSessionVar(SessionVariable.TABLE_STATS_HEALTH_THRESHOLD) + .tableStatsHealthThreshold; + } catch (Exception e) { + LOG.warn("Failed to get value of table_stats_health_threshold, return default", e); + } + return StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD; + } + + public static int getAnalyzeTimeout() { + try { + return findConfigFromGlobalSessionVar(SessionVariable.ANALYZE_TIMEOUT) + .analyzeTimeoutS; + } catch (Exception e) { + LOG.warn("Failed to get value of table_stats_health_threshold, return default", e); + } + return StatisticConstants.ANALYZE_TIMEOUT_IN_SEC; + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index f01485f642fac0f..d4dedd17123807b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -17,25 +17,10 @@ package org.apache.doris.statistics; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.InternalSchemaInitializer; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.common.FeConstants; -import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.catalog.Env; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; -import org.apache.doris.statistics.AnalysisInfo.AnalysisMode; -import org.apache.doris.statistics.AnalysisInfo.AnalysisType; -import org.apache.doris.statistics.AnalysisInfo.JobType; -import org.apache.doris.statistics.util.DBObjects; import org.apache.doris.statistics.util.StatisticsUtil; -import org.apache.doris.utframe.TestWithFeService; -import com.google.common.collect.Maps; import mockit.Expectations; import mockit.Mock; import mockit.MockUp; @@ -44,136 +29,196 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class AnalysisJobTest extends TestWithFeService { - - @Override - protected void runBeforeAll() throws Exception { - try { - InternalSchemaInitializer.createDB(); - createDatabase("analysis_job_test"); - connectContext.setDatabase("default_cluster:analysis_job_test"); - createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" - + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n" - + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" - + ");"); - } catch (Exception e) { - throw new RuntimeException(e); - } - FeConstants.runningUnitTest = true; - } +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +public class AnalysisJobTest { + // make user task has been set corresponding job @Test - public void testCreateAnalysisJob() throws Exception { + public void initTest(@Mocked AnalysisInfo jobInfo, @Mocked OlapAnalysisTask task) { + AnalysisJob analysisJob = new AnalysisJob(jobInfo, Arrays.asList(task)); + Assertions.assertSame(task.job, analysisJob); + } - new MockUp() { + @Test + public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + AtomicInteger writeBufInvokeTimes = new AtomicInteger(); + new MockUp() { + @Mock + protected void writeBuf() { + writeBufInvokeTimes.incrementAndGet(); + } @Mock - public AutoCloseConnectContext buildConnectContext() { - return new AutoCloseConnectContext(connectContext); + public void updateTaskState(AnalysisState state, String msg) { } @Mock - public void execUpdate(String sql) throws Exception { + public void deregisterJob() { } }; + AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); + job.queryingTask = new HashSet<>(); + job.queryingTask.add(olapAnalysisTask); + job.queryFinished = new HashSet<>(); + job.buf = new ArrayList<>(); + job.totalTaskCount = 20; + + // not all task finished nor cached limit exceed, shouldn't write + job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); + Assertions.assertEquals(0, writeBufInvokeTimes.get()); + } - new MockUp() { + @Test + public void testAppendBufTest2(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + AtomicInteger writeBufInvokeTimes = new AtomicInteger(); + AtomicInteger deregisterTimes = new AtomicInteger(); + + new MockUp() { @Mock - public List executeInternalQuery() { - return Collections.emptyList(); + protected void writeBuf() { + writeBufInvokeTimes.incrementAndGet(); } - }; - new MockUp() { + @Mock + public void updateTaskState(AnalysisState state, String msg) { + } @Mock - public ConnectContext get() { - return connectContext; + public void deregisterJob() { + deregisterTimes.getAndIncrement(); } }; - String sql = "ANALYZE TABLE t1"; - Assertions.assertNotNull(getSqlStmtExecutor(sql)); + AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); + job.queryingTask = new HashSet<>(); + job.queryingTask.add(olapAnalysisTask); + job.queryFinished = new HashSet<>(); + job.buf = new ArrayList<>(); + job.totalTaskCount = 1; + + job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); + // all task finished, should write and deregister this job + Assertions.assertEquals(1, writeBufInvokeTimes.get()); + Assertions.assertEquals(1, deregisterTimes.get()); } @Test - public void testJobExecution(@Mocked StmtExecutor stmtExecutor, @Mocked InternalCatalog catalog, @Mocked - Database database, - @Mocked OlapTable olapTable) - throws Exception { - new MockUp() { + public void testAppendBufTest3(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + AtomicInteger writeBufInvokeTimes = new AtomicInteger(); + new MockUp() { @Mock - public Column getColumn(String name) { - return new Column("col1", PrimitiveType.INT); + protected void writeBuf() { + writeBufInvokeTimes.incrementAndGet(); } - }; - - new MockUp() { @Mock - public ConnectContext buildConnectContext() { - return connectContext; + public void updateTaskState(AnalysisState state, String msg) { } @Mock - public void execUpdate(String sql) throws Exception { + public void deregisterJob() { } + }; + AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); + job.queryingTask = new HashSet<>(); + job.queryingTask.add(olapAnalysisTask); + job.queryFinished = new HashSet<>(); + job.buf = new ArrayList<>(); + ColStatsData colStatsData = new ColStatsData(); + for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) { + job.buf.add(colStatsData); + } + job.totalTaskCount = 100; + + job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); + // cache limit exceed, should write them + Assertions.assertEquals(1, writeBufInvokeTimes.get()); + } + @Test + public void testUpdateTaskState( + @Mocked AnalysisInfo info, + @Mocked OlapAnalysisTask task1, + @Mocked OlapAnalysisTask task2) { + AtomicInteger updateTaskStatusInvokeTimes = new AtomicInteger(); + new MockUp() { @Mock - public DBObjects convertIdToObjects(long catalogId, long dbId, long tblId) { - return new DBObjects(catalog, database, olapTable); + public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String message, long time) { + updateTaskStatusInvokeTimes.getAndIncrement(); } }; - new MockUp() { - + AnalysisManager analysisManager = new AnalysisManager(); + new MockUp() { @Mock - public void syncLoadColStats(long tableId, long idxId, String colName) { + public AnalysisManager getAnalysisManager() { + return analysisManager; } }; - new MockUp() { + AnalysisJob job = new AnalysisJob(info, Collections.singletonList(task1)); + job.queryFinished = new HashSet<>(); + job.queryFinished.add(task2); + job.updateTaskState(AnalysisState.FAILED, ""); + Assertions.assertEquals(2, updateTaskStatusInvokeTimes.get()); + } + @Test + public void testWriteBuf1(@Mocked AnalysisInfo info, + @Mocked OlapAnalysisTask task1, @Mocked OlapAnalysisTask task2) { + AnalysisJob job = new AnalysisJob(info, Collections.singletonList(task1)); + job.queryFinished = new HashSet<>(); + job.queryFinished.add(task2); + new MockUp() { @Mock - public void execute() throws Exception { - + public void updateTaskState(AnalysisState state, String msg) { } @Mock - public List executeInternalQuery() { - return new ArrayList<>(); - } - }; + protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { - new MockUp() { + } @Mock - public void execSQLs(List partitionAnalysisSQLs, Map params) throws Exception {} + protected void syncLoadStats() { + } }; - HashMap> colToPartitions = Maps.newHashMap(); - colToPartitions.put("col1", Collections.singleton("t1")); - AnalysisInfo analysisJobInfo = new AnalysisInfoBuilder().setJobId(0).setTaskId(0) - .setCatalogId(0) - .setDBId(0) - .setTblId(0) - .setColName("col1").setJobType(JobType.MANUAL) - .setAnalysisMode(AnalysisMode.FULL) - .setAnalysisMethod(AnalysisMethod.FULL) - .setAnalysisType(AnalysisType.FUNDAMENTALS) - .setColToPartitions(colToPartitions) - .setState(AnalysisState.RUNNING) - .build(); - new OlapAnalysisTask(analysisJobInfo).doExecute(); new Expectations() { { - stmtExecutor.execute(); + job.syncLoadStats(); times = 1; } }; + job.writeBuf(); + + Assertions.assertEquals(0, job.queryFinished.size()); + } + + @Test + public void testWriteBuf2(@Mocked AnalysisInfo info, + @Mocked OlapAnalysisTask task1, @Mocked OlapAnalysisTask task2) { + new MockUp() { + @Mock + public void updateTaskState(AnalysisState state, String msg) { + } + + @Mock + protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { + throw new RuntimeException(); + } + + @Mock + protected void syncLoadStats() { + } + }; + AnalysisJob job = new AnalysisJob(info, Collections.singletonList(task1)); + job.buf.add(new ColStatsData()); + job.queryFinished = new HashSet<>(); + job.queryFinished.add(task2); + job.writeBuf(); + Assertions.assertEquals(1, job.queryFinished.size()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index c995710da44d004..6372ce97d6eae3e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; @@ -340,7 +341,7 @@ public List getBaseSchema() { }; OlapTable olapTable = new OlapTable(); TableStatsMeta stats1 = new TableStatsMeta(0, 50, new AnalysisInfoBuilder().setColName("col1").build()); - stats1.updatedRows.addAndGet(30); + stats1.updatedRows.addAndGet(50); Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1)); TableStatsMeta stats2 = new TableStatsMeta(0, 190, new AnalysisInfoBuilder().setColName("col1").build()); @@ -349,4 +350,38 @@ public List getBaseSchema() { } + @Test + public void testRecordLimit1() { + Config.analyze_record_limit = 2; + AnalysisManager analysisManager = new AnalysisManager(); + analysisManager.replayCreateAnalysisJob(new AnalysisInfoBuilder().setJobId(1).build()); + analysisManager.replayCreateAnalysisJob(new AnalysisInfoBuilder().setJobId(2).build()); + analysisManager.replayCreateAnalysisJob(new AnalysisInfoBuilder().setJobId(3).build()); + Assertions.assertEquals(2, analysisManager.analysisJobInfoMap.size()); + Assertions.assertTrue(analysisManager.analysisJobInfoMap.containsKey(2L)); + Assertions.assertTrue(analysisManager.analysisJobInfoMap.containsKey(3L)); + } + + @Test + public void testRecordLimit2() { + Config.analyze_record_limit = 2; + AnalysisManager analysisManager = new AnalysisManager(); + analysisManager.replayCreateAnalysisTask(new AnalysisInfoBuilder().setTaskId(1).build()); + analysisManager.replayCreateAnalysisTask(new AnalysisInfoBuilder().setTaskId(2).build()); + analysisManager.replayCreateAnalysisTask(new AnalysisInfoBuilder().setTaskId(3).build()); + Assertions.assertEquals(2, analysisManager.analysisTaskInfoMap.size()); + Assertions.assertTrue(analysisManager.analysisTaskInfoMap.containsKey(2L)); + Assertions.assertTrue(analysisManager.analysisTaskInfoMap.containsKey(3L)); + } + + @Test + public void testRecordLimit3() { + Config.analyze_record_limit = 2; + AnalysisManager analysisManager = new AnalysisManager(); + analysisManager.autoJobs.offer(new AnalysisInfoBuilder().setJobId(1).build()); + analysisManager.autoJobs.offer(new AnalysisInfoBuilder().setJobId(2).build()); + analysisManager.autoJobs.offer(new AnalysisInfoBuilder().setJobId(3).build()); + Assertions.assertEquals(2, analysisManager.autoJobs.size()); + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index 19d7798041a1140..8cfcfeabd28be0e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -37,6 +37,7 @@ import mockit.Mock; import mockit.MockUp; import mockit.Mocked; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -45,6 +46,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; public class AnalysisTaskExecutorTest extends TestWithFeService { @@ -82,6 +84,15 @@ public Column getColumn(String name) { return new Column("col1", PrimitiveType.INT); } }; + final AtomicBoolean cancelled = new AtomicBoolean(); + new MockUp() { + + @Mock + public boolean cancel(String msg) { + cancelled.set(true); + return true; + } + }; AnalysisInfo analysisJobInfo = new AnalysisInfoBuilder().setJobId(0).setTaskId(0) .setCatalogId(0) .setDBId(0) @@ -98,7 +109,10 @@ public Column getColumn(String name) { AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob); Deencapsulation.setField(analysisTaskWrapper, "startTime", 5); b.put(analysisTaskWrapper); - analysisTaskExecutor.start(); + analysisTaskExecutor.tryToCancel(); + Assertions.assertTrue(cancelled.get()); + Assertions.assertTrue(b.isEmpty()); + } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java new file mode 100644 index 000000000000000..268540885dac1d0 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.FeConstants; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMode; +import org.apache.doris.statistics.AnalysisInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.util.DBObjects; +import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class AnalyzeTest extends TestWithFeService { + + @Override + protected void runBeforeAll() throws Exception { + try { + InternalSchemaInitializer.createDB(); + createDatabase("analysis_job_test"); + connectContext.setDatabase("default_cluster:analysis_job_test"); + createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" + + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n" + + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" + + ");"); + } catch (Exception e) { + throw new RuntimeException(e); + } + FeConstants.runningUnitTest = true; + } + + @Test + public void testCreateAnalysisJob() throws Exception { + + new MockUp() { + + @Mock + public AutoCloseConnectContext buildConnectContext() { + return new AutoCloseConnectContext(connectContext); + } + + @Mock + public void execUpdate(String sql) throws Exception { + } + }; + + new MockUp() { + @Mock + public List executeInternalQuery() { + return Collections.emptyList(); + } + }; + + new MockUp() { + + @Mock + public ConnectContext get() { + return connectContext; + } + }; + String sql = "ANALYZE TABLE t1"; + Assertions.assertNotNull(getSqlStmtExecutor(sql)); + } + + @Test + public void testJobExecution(@Mocked StmtExecutor stmtExecutor, @Mocked InternalCatalog catalog, @Mocked + Database database, + @Mocked OlapTable olapTable) + throws Exception { + new MockUp() { + + @Mock + public Column getColumn(String name) { + return new Column("col1", PrimitiveType.INT); + } + }; + + new MockUp() { + + @Mock + public ConnectContext buildConnectContext() { + return connectContext; + } + + @Mock + public void execUpdate(String sql) throws Exception { + } + + @Mock + public DBObjects convertIdToObjects(long catalogId, long dbId, long tblId) { + return new DBObjects(catalog, database, olapTable); + } + }; + new MockUp() { + + @Mock + public void syncLoadColStats(long tableId, long idxId, String colName) { + } + }; + new MockUp() { + + @Mock + public void execute() throws Exception { + + } + + @Mock + public List executeInternalQuery() { + return new ArrayList<>(); + } + }; + + new MockUp() { + + @Mock + public void execSQLs(List partitionAnalysisSQLs, Map params) throws Exception {} + }; + + new MockUp() { + + @Mock + protected void runQuery(String sql) {} + }; + HashMap> colToPartitions = Maps.newHashMap(); + colToPartitions.put("col1", Collections.singleton("t1")); + AnalysisInfo analysisJobInfo = new AnalysisInfoBuilder().setJobId(0).setTaskId(0) + .setCatalogId(0) + .setDBId(0) + .setTblId(0) + .setColName("col1").setJobType(JobType.MANUAL) + .setAnalysisMode(AnalysisMode.FULL) + .setAnalysisMethod(AnalysisMethod.FULL) + .setAnalysisType(AnalysisType.FUNDAMENTALS) + .setColToPartitions(colToPartitions) + .setState(AnalysisState.RUNNING) + .build(); + new OlapAnalysisTask(analysisJobInfo).doExecute(); + new Expectations() { + { + stmtExecutor.execute(); + times = 1; + } + }; + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java index f5b98a47ce02ad3..95ed5023e3652b1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.HMSExternalDatabase; import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.ha.FrontendNodeType; @@ -31,6 +32,9 @@ import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest; import org.apache.doris.utframe.TestWithFeService; +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.Lists; import com.google.gson.JsonArray; import com.google.gson.JsonElement; @@ -40,9 +44,11 @@ import mockit.Mock; import mockit.MockUp; import mockit.Mocked; +import org.checkerframework.checker.nullness.qual.NonNull; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -50,6 +56,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; public class CacheTest extends TestWithFeService { @@ -350,4 +357,29 @@ private void sendStats(Frontend frontend, } }; } + + @Test + public void testEvict() { + ThreadPoolExecutor threadPool + = ThreadPoolManager.newDaemonFixedThreadPool( + 1, Integer.MAX_VALUE, "STATS_FETCH", true); + AsyncLoadingCache columnStatisticsCache = + Caffeine.newBuilder() + .maximumSize(1) + .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL)) + .executor(threadPool) + .buildAsync(new AsyncCacheLoader() { + @Override + public @NonNull CompletableFuture asyncLoad(@NonNull Integer integer, + @NonNull Executor executor) { + return CompletableFuture.supplyAsync(() -> { + return integer; + }, threadPool); + } + }); + columnStatisticsCache.get(1); + columnStatisticsCache.get(2); + Assertions.assertTrue(columnStatisticsCache.synchronous().asMap().containsKey(2)); + Assertions.assertEquals(1, columnStatisticsCache.synchronous().asMap().size()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java index d618a5fa5380db2..f2b9f84f0d0e7db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java @@ -19,47 +19,36 @@ import org.apache.doris.analysis.TableSample; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.Config; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.util.StatisticsUtil; -import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class OlapAnalysisTaskTest { + // test manual @Test - public void testAutoSample(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked TableIf tableIf) { - new Expectations() { - { - tableIf.getDataSize(true); - result = 60_0000_0000L; - } - }; + public void testSample1(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked TableIf tableIf) { AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder() .setAnalysisMethod(AnalysisMethod.FULL); + analysisInfoBuilder.setJobType(JobType.MANUAL); OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask(); olapAnalysisTask.info = analysisInfoBuilder.build(); olapAnalysisTask.tbl = tableIf; - Config.enable_auto_sample = true; TableSample tableSample = olapAnalysisTask.getTableSample(); - Assertions.assertEquals(4194304, tableSample.getSampleValue()); - Assertions.assertFalse(tableSample.isPercent()); - - new Expectations() { - { - tableIf.getDataSize(true); - result = 1_0000_0000L; - } - }; - tableSample = olapAnalysisTask.getTableSample(); Assertions.assertNull(tableSample); analysisInfoBuilder.setSampleRows(10); + analysisInfoBuilder.setJobType(JobType.MANUAL); analysisInfoBuilder.setAnalysisMethod(AnalysisMethod.SAMPLE); olapAnalysisTask.info = analysisInfoBuilder.build(); tableSample = olapAnalysisTask.getTableSample(); @@ -67,4 +56,49 @@ public void testAutoSample(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf datab Assertions.assertFalse(tableSample.isPercent()); } + // test auto big table + @Test + public void testSample2(@Mocked OlapTable tbl) { + new MockUp() { + + @Mock + public long getDataSize(boolean singleReplica) { + return 1000_0000_0000L; + } + }; + + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder() + .setAnalysisMethod(AnalysisMethod.FULL); + analysisInfoBuilder.setJobType(JobType.SYSTEM); + OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask(); + olapAnalysisTask.info = analysisInfoBuilder.build(); + olapAnalysisTask.tbl = tbl; + TableSample tableSample = olapAnalysisTask.getTableSample(); + Assertions.assertNotNull(tableSample); + Assertions.assertEquals(StatisticsUtil.getHugeTableSampleRows(), tableSample.getSampleValue()); + + } + + // test auto small table + @Test + public void testSample3(@Mocked OlapTable tbl) { + new MockUp() { + + @Mock + public long getDataSize(boolean singleReplica) { + return 1000; + } + }; + + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder() + .setAnalysisMethod(AnalysisMethod.FULL); + analysisInfoBuilder.setJobType(JobType.SYSTEM); + OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask(); + olapAnalysisTask.info = analysisInfoBuilder.build(); + olapAnalysisTask.tbl = tbl; + TableSample tableSample = olapAnalysisTask.getTableSample(); + Assertions.assertNull(tableSample); + + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java new file mode 100644 index 000000000000000..d441ce5b09db98c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java @@ -0,0 +1,289 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.View; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.system.SystemInfoService; + +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.apache.hadoop.util.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +public class StatisticsAutoCollectorTest { + + @Test + public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) { + new MockUp() { + @Mock + public Collection getAllDbs() { + Database db1 = new Database(1, SystemInfoService.DEFAULT_CLUSTER + + ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME); + Database db2 = new Database(2, "anyDB"); + List databaseIfs = new ArrayList<>(); + databaseIfs.add(db1); + databaseIfs.add(db2); + return databaseIfs; + } + }; + new MockUp() { + @Mock + public List constructAnalysisInfo(DatabaseIf db) { + return Arrays.asList(analysisInfo, analysisInfo); + } + + int count = 0; + + @Mock + public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) { + return count++ == 0 ? null : jobInfo; + } + + @Mock + public void createSystemAnalysisJob(AnalysisInfo jobInfo) + throws DdlException { + + } + }; + + StatisticsAutoCollector saa = new StatisticsAutoCollector(); + saa.runAfterCatalogReady(); + new Expectations() { + { + try { + saa.createSystemAnalysisJob((AnalysisInfo) any); + times = 1; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + + @Test + public void testConstructAnalysisInfo( + @Injectable OlapTable o2, @Injectable View v) { + new MockUp() { + @Mock + public List getTables() { + List
tableIfs = new ArrayList<>(); + tableIfs.add(o2); + tableIfs.add(v); + return tableIfs; + } + + @Mock + public String getFullName() { + return "anyDb"; + } + }; + + new MockUp() { + @Mock + public String getName() { + return "anytable"; + } + + @Mock + public List getBaseSchema() { + List columns = new ArrayList<>(); + columns.add(new Column("c1", PrimitiveType.INT)); + columns.add(new Column("c2", PrimitiveType.HLL)); + return columns; + } + }; + StatisticsAutoCollector saa = new StatisticsAutoCollector(); + List analysisInfos = + saa.constructAnalysisInfo(new Database(1, "anydb")); + Assertions.assertEquals(1, analysisInfos.size()); + Assertions.assertEquals("c1", analysisInfos.get(0).colName.split(",")[0]); + } + + @Test + public void testGetReAnalyzeRequiredPart0() { + + TableIf tableIf = new OlapTable(); + + new MockUp() { + @Mock + protected Map> findReAnalyzeNeededPartitions() { + Set partitionNames = new HashSet<>(); + partitionNames.add("p1"); + partitionNames.add("p2"); + Map> map = new HashMap<>(); + map.put("col1", partitionNames); + return map; + } + + @Mock + public long getRowCount() { + return 100; + } + + @Mock + public List getBaseSchema() { + return Lists.newArrayList(new Column("col1", Type.INT), new Column("col2", Type.INT)); + } + }; + + new MockUp() { + @Mock + public TableIf findTable(long catalogName, long dbName, long tblName) { + return tableIf; + } + }; + AnalysisInfo analysisInfo = new AnalysisInfoBuilder().setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( + AnalysisType.FUNDAMENTALS).setColName("col1").setJobType(JobType.SYSTEM).build(); + new MockUp() { + + int count = 0; + + TableStatsMeta[] tableStatsArr = + new TableStatsMeta[] {new TableStatsMeta(0, 0, analysisInfo), + new TableStatsMeta(0, 0, analysisInfo), null}; + + { + tableStatsArr[0].updatedRows.addAndGet(100); + tableStatsArr[1].updatedRows.addAndGet(0); + } + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return tableStatsArr[count++]; + } + }; + + new MockUp() { + @Mock + public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table, + Set needRunPartitions) { + return new AnalysisInfoBuilder().build(); + } + }; + StatisticsAutoCollector statisticsAutoCollector = new StatisticsAutoCollector(); + AnalysisInfo analysisInfo2 = new AnalysisInfoBuilder() + .setCatalogId(0) + .setDBId(0) + .setTblId(0).build(); + Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); + // uncomment it when updatedRows gets ready + // Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); + Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); + } + + @Test + public void testLoop() { + AtomicBoolean timeChecked = new AtomicBoolean(); + AtomicBoolean switchChecked = new AtomicBoolean(); + new MockUp() { + + @Mock + public boolean inAnalyzeTime(LocalTime now) { + timeChecked.set(true); + return true; + } + + @Mock + public boolean enableAutoAnalyze() { + switchChecked.set(true); + return true; + } + }; + StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); + autoCollector.collect(); + Assertions.assertTrue(timeChecked.get() && switchChecked.get()); + + } + + @Test + public void checkAvailableThread() { + StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); + Assertions.assertEquals(Config.full_auto_analyze_simultaneously_running_task_num, + autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize()); + } + + @Test + public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta stats, @Mocked TableIf anyOtherTable) { + new MockUp() { + + @Mock + public long getDataSize(boolean singleReplica) { + return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 + 1000000000; + } + }; + + new MockUp() { + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return stats; + } + }; + // A very huge table has been updated recently, so we should skip it this time + stats.updatedTime = System.currentTimeMillis() - 1000; + StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); + Assertions.assertTrue(autoCollector.skip(olapTable)); + // The update of this huge table is long time ago, so we shouldn't skip it this time + stats.updatedTime = System.currentTimeMillis() + - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 10000; + Assertions.assertFalse(autoCollector.skip(olapTable)); + new MockUp() { + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return null; + } + }; + // can't find table stats meta, which means this table never get analyzed, so we shouldn't skip it this time + Assertions.assertFalse(autoCollector.skip(olapTable)); + // this is not olap table nor external table, so we should skip it this time + Assertions.assertTrue(autoCollector.skip(anyOtherTable)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java index c0d4a656d755658..c0c790c9c25dff7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java @@ -19,9 +19,15 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.qe.SessionVariable; -import org.junit.Test; +import mockit.Mock; +import mockit.MockUp; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; public class StatisticsUtilTest { @Test @@ -67,4 +73,42 @@ public void testConvertToDouble() { Assertions.fail(); } } + + @Test + public void testInAnalyzeTime1() { + new MockUp() { + + @Mock + protected SessionVariable findConfigFromGlobalSessionVar(String varName) throws Exception { + SessionVariable sessionVariable = new SessionVariable(); + sessionVariable.fullAutoAnalyzeStartTime = "00:00:00"; + sessionVariable.fullAutoAnalyzeEndTime = "02:00:00"; + return sessionVariable; + } + }; + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); + String now = "01:00:00"; + Assertions.assertTrue(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now, timeFormatter))); + now = "13:00:00"; + Assertions.assertFalse(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now, timeFormatter))); + } + + @Test + public void testInAnalyzeTime2() { + new MockUp() { + + @Mock + protected SessionVariable findConfigFromGlobalSessionVar(String varName) throws Exception { + SessionVariable sessionVariable = new SessionVariable(); + sessionVariable.fullAutoAnalyzeStartTime = "00:00:00"; + sessionVariable.fullAutoAnalyzeEndTime = "23:00:00"; + return sessionVariable; + } + }; + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); + String now = "15:00:00"; + Assertions.assertTrue(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now, timeFormatter))); + now = "23:30:00"; + Assertions.assertFalse(StatisticsUtil.inAnalyzeTime(LocalTime.parse(now, timeFormatter))); + } } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index 4e4c4a08425944f..92692559193341f 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -57,7 +57,11 @@ suite("test_analyze") { `analyzetestlimitedk8` double null comment "", `analyzetestlimitedk9` float null comment "", `analyzetestlimitedk12` string null comment "", - `analyzetestlimitedk13` largeint(40) null comment "" + `analyzetestlimitedk13` largeint(40) null comment "", + `analyzetestlimitedk14` ARRAY NULL COMMENT "", + `analyzetestlimitedk15` Map NULL COMMENT "", + `analyzetestlimitedk16` STRUCT NULL, + `analyzetestlimitedk17` JSON NULL ) engine=olap DUPLICATE KEY(`analyzetestlimitedk3`) DISTRIBUTED BY HASH(`analyzetestlimitedk3`) BUCKETS 5 properties("replication_num" = "1") @@ -67,26 +71,39 @@ suite("test_analyze") { INSERT INTO `${tbl}` VALUES (-2103297891,1,101,15248,4761818404925265645,939926.283, 'UTmCFKMbprf0zSVOIlBJRNOl3JcNBdOsnCDt','2022-09-28','2022-10-28 01:56:56','tVvGDSrN6kyn', -954349107.187117,-40.46286,'g1ZP9nqVgaGKya3kPERdBofTWJQ4TIJEz972Xvw4hfPpTpWwlmondiLVTCyld7rSBlSWrE7NJRB0pvPGEFQKOx1s3', - '-1559301292834325905'), + '-1559301292834325905', NULL, NULL, NULL, NULL), (-2094982029,0,-81,-14746,-2618177187906633064,121889.100,NULL,'2023-05-01','2022-11-25 00:24:12', '36jVI0phYfhFucAOEASbh4OdvUYcI7QZFgQSveNyfGcRRUtQG9HGN1UcCmUH',-82250254.174239,NULL, - 'bTUHnMC4v7dI8U3TK0z4wZHdytjfHQfF1xKdYAVwPVNMT4fT4F92hj8ENQXmCkWtfp','6971810221218612372'), + 'bTUHnMC4v7dI8U3TK0z4wZHdytjfHQfF1xKdYAVwPVNMT4fT4F92hj8ENQXmCkWtfp','6971810221218612372', NULL, NULL, NULL, NULL), (-1840301109,1,NULL,NULL,7805768460922079440,546556.220,'wC7Pif9SJrg9b0wicGfPz2ezEmEKotmN6AMI',NULL, '2023-05-20 18:13:14','NM5SLu62SGeuD',-1555800813.9748349,-11122.953, - 'NH97wIjXk7dspvvfUUKe41ZetUnDmqLxGg8UYXwOwK3Jlu7dxO2GE9UJjyKW0NBxqUk1DY','-5004534044262380098'), + 'NH97wIjXk7dspvvfUUKe41ZetUnDmqLxGg8UYXwOwK3Jlu7dxO2GE9UJjyKW0NBxqUk1DY','-5004534044262380098', NULL, NULL, NULL, NULL), (-1819679967,0,10,NULL,-5772413527188525359,-532045.626,'kqMe4VYEZAmajLthCLRkl8StDQHKrDWz91AQ','2022-06-30', '2023-02-22 15:30:38','wAbeF3p84j5pFJTInQuKZOezFbsy8HIjmuUF',-1766437367.4377379,1791.4128, - '6OWmBD04UeKt1xI2XnR8t1kPG7qEYrf4J8RkA8UMs4HF33Yl','-8433424551792664598'), + '6OWmBD04UeKt1xI2XnR8t1kPG7qEYrf4J8RkA8UMs4HF33Yl','-8433424551792664598', NULL, NULL, NULL, NULL), (-1490846276,0,NULL,7744,6074522476276146996,594200.976,NULL,'2022-11-27','2023-03-11 21:28:44', 'yr8AuJLr2ud7DIwlt06cC7711UOsKslcDyySuqqfQE5X7Vjic6azHOrM6W',-715849856.288922,3762.217, - '4UpWZJ0Twrefw0Tm0AxFS38V5','7406302706201801560'),(-1465848366,1,72,29170,-5585523608136628843,-34210.874, + '4UpWZJ0Twrefw0Tm0AxFS38V5','7406302706201801560', NULL, NULL, NULL, NULL),(-1465848366,1,72,29170,-5585523608136628843,-34210.874, 'rMGygAWU91Wa3b5A7l1wheo6EF0o6zhw4YeE','2022-09-20','2023-06-11 18:17:16','B6m9S9O2amsa4SXrEKK0ivJ2x9m1u8av', - 862085772.298349,-22304.209,'1','-3399178642401166400'),(-394034614,1,65,5393,-200651968801088119,NULL, + 862085772.298349,-22304.209,'1','-3399178642401166400', NULL, NULL, NULL, NULL),(-394034614,1,65,5393,-200651968801088119,NULL, '9MapWX9pn8zes9Gey1lhRsH3ATyQPIysjQYi','2023-05-11','2022-07-02 02:56:53','z5VWbuKr6HiK7yC7MRIoQGrb98VUS', - 1877828963.091433,-1204.1926,'fSDQqT38rkrJEi6fwc90rivgQcRPaW5V1aEmZpdSvUm','8882970420609470903'), + 1877828963.091433,-1204.1926,'fSDQqT38rkrJEi6fwc90rivgQcRPaW5V1aEmZpdSvUm','8882970420609470903', NULL, NULL, NULL, NULL), (-287465855,0,-10,-32484,-5161845307234178602,748718.592,'n64TXbG25DQL5aw5oo9o9cowSjHCXry9HkId','2023-01-02', '2022-11-17 14:58:52','d523m4PwLdHZtPTqSoOBo5IGivCKe4A1Sc8SKCILFxgzYLe0',NULL,27979.855, - 'ps7qwcZjBjkGfcXYMw5HQMwnElzoHqinwk8vhQCbVoGBgfotc4oSkpD3tP34h4h0tTogDMwFu60iJm1bofUzyUQofTeRwZk8','4692206687866847780') + 'ps7qwcZjBjkGfcXYMw5HQMwnElzoHqinwk8vhQCbVoGBgfotc4oSkpD3tP34h4h0tTogDMwFu60iJm1bofUzyUQofTeRwZk8','4692206687866847780', NULL, NULL, NULL, NULL) + """ + + sql """ + SET enable_nereids_planner=true; + + """ + + sql """ + SET forbid_unknown_col_stats=false; + """ + + sql """ + SELECT * FROM ${tbl} """ sql """ @@ -97,10 +114,6 @@ suite("test_analyze") { ANALYZE DATABASE ${db} WITH SYNC """ - sql """ - SET enable_nereids_planner=true; - - """ sql """ SET enable_fallback_to_original_planner=false; """ @@ -152,19 +165,19 @@ suite("test_analyze") { exception = e } - a_result_1 = sql """ + def a_result_1 = sql """ ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 10 """ - a_result_2 = sql """ + def a_result_2 = sql """ ANALYZE DATABASE ${db} WITH SYNC WITH SAMPLE PERCENT 5 """ - a_result_3 = sql """ + def a_result_3 = sql """ ANALYZE DATABASE ${db} WITH SAMPLE PERCENT 5 """ - show_result = sql """ + def show_result = sql """ SHOW ANALYZE """ @@ -891,8 +904,24 @@ PARTITION `p599` VALUES IN (599) } assert expected_col_stats(id_col_stats, 600, 1) - assert expected_col_stats(id_col_stats, 599, 7) + assert (int) Double.parseDouble(id_col_stats[0][2]) < 700 + && (int) Double.parseDouble(id_col_stats[0][2]) > 500 + assert expected_col_stats(id_col_stats, 0, 3) + assert expected_col_stats(id_col_stats, 2400, 4) + assert expected_col_stats(id_col_stats, 4, 5) assert expected_col_stats(id_col_stats, 0, 6) + assert expected_col_stats(id_col_stats, 599, 7) + + def update_time = id_col_stats[0][8] + + sql """ANALYZE TABLE test_600_partition_table_analyze WITH SYNC""" + + // Data has no change, update time shouldn't be update since this table don't need to analyze again + id_col_stats_2 = sql """ + SHOW COLUMN CACHED STATS test_600_partition_table_analyze(id); + """ + + assert update_time == id_col_stats_2[0][8] sql """DROP TABLE IF EXISTS increment_analyze_test""" sql """ @@ -1151,4 +1180,39 @@ PARTITION `p599` VALUES IN (599) return (r[0][7]).equals(expected_value) } expected_max(max, "测试") + + show_result = sql """ + SHOW ANALYZE ${tbl} + """ + + def tbl_name_as_expetected = { r,name -> + for (int i = 0; i < r.size; i++) { + if (r[i][3] != name) { + return false + } + } + return true + } + + assert show_result[0][9] == "FINISHED" + assert tbl_name_as_expetected(show_result, "${tbl}") + + show_result = sql """ + SHOW ANALYZE ${tbl} WHERE STATE = "FINISHED" + """ + + assert show_result.size() > 0 + + def all_finished = { r -> + for (int i = 0; i < r.size; i++) { + if (r[i][9] != "FINISHED") { + return false + } + } + return true + } + + assert all_finished(show_result) + + } diff --git a/regression-test/suites/statistics/test_agg_complex_type.groovy b/regression-test/suites/statistics/test_agg_complex_type.groovy new file mode 100644 index 000000000000000..55af87f35bd6321 --- /dev/null +++ b/regression-test/suites/statistics/test_agg_complex_type.groovy @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_analyze_with_agg_complex_type") { + sql """drop table if exists test_agg_complex_type;""" + + sql """create table test_agg_complex_type ( + datekey int, + device_id bitmap BITMAP_UNION NULL, + hll_test hll hll_union, + qs QUANTILE_STATE QUANTILE_UNION + ) + aggregate key (datekey) + distributed by hash(datekey) buckets 1 + properties( + "replication_num" = "1" + );""" + + sql """insert into test_agg_complex_type values (1,to_bitmap(1), hll_hash("11"), TO_QUANTILE_STATE("11", 1.0));""" + + sql """insert into test_agg_complex_type values (2, to_bitmap(1), hll_hash("12"), TO_QUANTILE_STATE("11", 1.0));""" + + sql """ANALYZE TABLE test_agg_complex_type WITH SYNC""" + + def show_result = sql """SHOW COLUMN CACHED STATS test_agg_complex_type""" + + assert show_result.size() == 1 + + def expected_col_stats = { r, expected_value, idx -> + return (int) Double.parseDouble(r[0][idx]) == expected_value + } + + assert expected_col_stats(show_result, 2, 1) + assert expected_col_stats(show_result, 0, 3) + assert expected_col_stats(show_result, 8, 4) + assert expected_col_stats(show_result, 4, 5) + assert expected_col_stats(show_result, 1, 6) + assert expected_col_stats(show_result, 2, 7) +} \ No newline at end of file