From 06755f64b101b8267c14b4c3c85b78973be7aacb Mon Sep 17 00:00:00 2001 From: Jibing Li Date: Mon, 27 May 2024 12:20:28 +0800 Subject: [PATCH] Support partition stats cache --- .../doris/analysis/ShowColumnStatsStmt.java | 29 +++ .../org/apache/doris/qe/ShowExecutor.java | 47 ++++- .../doris/service/FrontendServiceImpl.java | 3 +- .../doris/statistics/AnalysisManager.java | 2 +- .../doris/statistics/ColumnStatistic.java | 5 +- .../ColumnStatisticsCacheLoader.java | 7 +- .../doris/statistics/PartitionStatistic.java | 165 ++++++++++++++++++ .../statistics/PartitionStatisticBuilder.java | 160 +++++++++++++++++ .../PartitionStatisticCacheKey.java | 82 +++++++++ .../PartitionStatisticCacheLoader.java | 80 +++++++++ .../doris/statistics/StatisticsCache.java | 37 +++- .../statistics/StatisticsRepository.java | 18 ++ .../doris/statistics/util/StatisticsUtil.java | 11 +- .../test_show_partition_stats.groovy | 30 ++++ 14 files changed, 658 insertions(+), 18 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatistic.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticBuilder.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticCacheLoader.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java index 6c54175784f9514..4ef8e74f6fa2dd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java @@ -36,6 +36,8 @@ import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.ColStatsMeta; import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.PartitionStatistic; +import org.apache.doris.statistics.PartitionStatisticCacheKey; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.TableStatsMeta; @@ -44,6 +46,7 @@ import com.google.common.collect.Sets; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -225,6 +228,32 @@ public ShowResultSet constructPartitionResultSet(List resultRows, Tab return new ShowResultSet(getMetaData(), result); } + public ShowResultSet constructPartitionCachedStats( + Map resultMap, TableIf tableIf) { + List> result = Lists.newArrayList(); + for (Map.Entry entry : resultMap.entrySet()) { + PartitionStatisticCacheKey key = entry.getKey(); + PartitionStatistic value = entry.getValue(); + List row = Lists.newArrayList(); + row.add(key.colName); // column_name + row.add(key.partId); // partition_name + long indexId = key.idxId; + String indexName = indexId == -1 ? tableIf.getName() : ((OlapTable) tableIf).getIndexNameById(indexId); + row.add(indexName); // index_name. + row.add(String.valueOf(value.count)); // count + row.add(String.valueOf(value.ndv.estimateCardinality())); // ndv + row.add(String.valueOf(value.numNulls)); // num_null + row.add(String.valueOf(value.minValue)); // min + row.add(String.valueOf(value.maxValue)); // max + row.add(String.valueOf(value.dataSize)); // data_size + row.add(value.updatedTime); // updated_time + row.add("N/A"); // update_rows + row.add("N/A"); // trigger. Manual or System + result.add(row); + } + return new ShowResultSet(getMetaData(), result); + } + public PartitionNames getPartitionNames() { return partitionNames; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index c767c45f4b35b87..8ad8b9339cb2d1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -218,6 +218,8 @@ import org.apache.doris.statistics.AutoAnalysisPendingJob; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; +import org.apache.doris.statistics.PartitionStatistic; +import org.apache.doris.statistics.PartitionStatisticCacheKey; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticsRepository; import org.apache.doris.statistics.TableStatsMeta; @@ -2636,9 +2638,14 @@ private void handleShowColumnStats() throws AnalysisException { List partNames = partitionNames.getPartitionNames() == null ? new ArrayList<>(tableIf.getPartitionNames()) : partitionNames.getPartitionNames(); - List partitionColumnStats = - StatisticsRepository.queryColumnStatisticsByPartitions(tableIf, columnNames, partNames); - resultSet = showColumnStatsStmt.constructPartitionResultSet(partitionColumnStats, tableIf); + if (showCache) { + resultSet = showColumnStatsStmt.constructPartitionCachedStats( + getCachedPartitionStats(columnNames, partNames, tableIf), tableIf); + } else { + List partitionColumnStats = + StatisticsRepository.queryColumnStatisticsByPartitions(tableIf, columnNames, partNames); + resultSet = showColumnStatsStmt.constructPartitionResultSet(partitionColumnStats, tableIf); + } } else { if (isAllColumns && !showCache) { getStatsForAllColumns(columnStatistics, tableIf); @@ -2704,6 +2711,40 @@ private void getStatsForSpecifiedColumns(List, ColumnS } } + private Map getCachedPartitionStats( + Set columnNames, List partitionNames, TableIf tableIf) { + Map ret = new HashMap<>(); + long catalogId = tableIf.getDatabase().getCatalog().getId(); + long dbId = tableIf.getDatabase().getId(); + long tableId = tableIf.getId(); + for (String colName : columnNames) { + // Olap base index use -1 as index id. + List indexIds = Lists.newArrayList(); + if (tableIf instanceof OlapTable) { + indexIds = ((OlapTable) tableIf).getMvColumnIndexIds(colName); + } else { + indexIds.add(-1L); + } + for (long indexId : indexIds) { + String indexName = tableIf.getName(); + if (tableIf instanceof OlapTable) { + OlapTable olapTable = (OlapTable) tableIf; + indexName = olapTable.getIndexNameById(indexId == -1 ? olapTable.getBaseIndexId() : indexId); + } + if (indexName == null) { + continue; + } + for (String partName : partitionNames) { + PartitionStatistic partitionStatistics = Env.getCurrentEnv().getStatisticsCache() + .getPartitionStatistics(catalogId, dbId, tableId, indexId, partName, colName); + ret.put(new PartitionStatisticCacheKey(catalogId, dbId, tableId, indexId, partName, colName), + partitionStatistics); + } + } + } + return ret; + } + public void handleShowColumnHist() { // TODO: support histogram in the future. ShowColumnHistStmt showColumnHistStmt = (ShowColumnHistStmt) stmt; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ca3c43470bed118..7fa2cdcbe42aba1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3178,7 +3178,8 @@ public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws ColStatsData data = GsonUtils.GSON.fromJson(request.colStatsData, ColStatsData.class); ColumnStatistic c = data.toColumnStatistic(); if (c == ColumnStatistic.UNKNOWN) { - Env.getCurrentEnv().getStatisticsCache().invalidate(k.catalogId, k.dbId, k.tableId, k.idxId, k.colName); + Env.getCurrentEnv().getStatisticsCache().invalidate(k.catalogId, k.dbId, k.tableId, + k.idxId, null, k.colName); } else { Env.getCurrentEnv().getStatisticsCache().updateColStatsCache( k.catalogId, k.dbId, k.tableId, k.idxId, k.colName, c); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index eb10fb09c7b8e71..9ca8edafe882f48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -703,7 +703,7 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId, } } tableStats.removeColumn(indexName, column); - statisticsCache.invalidate(catalogId, dbId, tableId, indexId, column); + statisticsCache.invalidate(catalogId, dbId, tableId, indexId, null, column); } } tableStats.updatedTime = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index d51242ed0791e7c..f0aa3233fef3ea5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -30,9 +30,7 @@ import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; public class ColumnStatistic { @@ -110,7 +108,6 @@ public ColumnStatistic(double count, double ndv, ColumnStatistic original, doubl } public static ColumnStatistic fromResultRow(List resultRows) { - Map partitionIdToColStats = new HashMap<>(); ColumnStatistic columnStatistic = null; try { for (ResultRow resultRow : resultRows) { @@ -118,7 +115,7 @@ public static ColumnStatistic fromResultRow(List resultRows) { if (partId == null) { columnStatistic = fromResultRow(resultRow); } else { - partitionIdToColStats.put(partId, fromResultRow(resultRow)); + LOG.warn("Column statistics table shouldn't contain partition stats. [{}]", resultRow); } } } catch (Throwable t) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java index 056ed7bcee5c7c6..fba1a4d7b720613 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java @@ -44,9 +44,8 @@ protected Optional doLoad(StatisticsCacheKey key) { columnStatistic = table.getColumnStatistic(key.colName); } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Exception to get column statistics by metadata." - + "[Catalog:{}, DB:{}, Table:{}]", - key.catalogId, key.dbId, key.tableId), e); + LOG.debug("Exception to get column statistics by metadata. [Catalog:{}, DB:{}, Table:{}]", + key.catalogId, key.dbId, key.tableId, e); } } } @@ -69,7 +68,7 @@ protected Optional doLoad(StatisticsCacheKey key) { } private Optional loadFromStatsTable(StatisticsCacheKey key) { - List columnResults = null; + List columnResults; try { columnResults = StatisticsRepository.loadColStats( key.catalogId, key.dbId, key.tableId, key.idxId, key.colName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatistic.java new file mode 100644 index 000000000000000..e4c35723f8406eb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatistic.java @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.io.Hll; +import org.apache.doris.statistics.util.Hll128; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.Base64; +import java.util.List; + +public class PartitionStatistic { + + private static final Logger LOG = LogManager.getLogger(PartitionStatistic.class); + + public static PartitionStatistic UNKNOWN = new PartitionStatisticBuilder().setAvgSizeByte(1).setNdv(new Hll128()) + .setNumNulls(1).setCount(1).setMaxValue(Double.POSITIVE_INFINITY).setMinValue(Double.NEGATIVE_INFINITY) + .setIsUnknown(true).setUpdatedTime("") + .build(); + + public static PartitionStatistic ZERO = new PartitionStatisticBuilder().setAvgSizeByte(0).setNdv(new Hll128()) + .setNumNulls(0).setCount(0).setMaxValue(Double.NaN).setMinValue(Double.NaN) + .build(); + + public final double count; + public final Hll128 ndv; + public final double numNulls; + public final double dataSize; + public final double avgSizeByte; + public final double minValue; + public final double maxValue; + public final boolean isUnKnown; + public final LiteralExpr minExpr; + public final LiteralExpr maxExpr; + public final String updatedTime; + + public PartitionStatistic(double count, Hll128 ndv, double avgSizeByte, + double numNulls, double dataSize, double minValue, double maxValue, + LiteralExpr minExpr, LiteralExpr maxExpr, boolean isUnKnown, + String updatedTime) { + this.count = count; + this.ndv = ndv; + this.avgSizeByte = avgSizeByte; + this.numNulls = numNulls; + this.dataSize = dataSize; + this.minValue = minValue; + this.maxValue = maxValue; + this.minExpr = minExpr; + this.maxExpr = maxExpr; + this.isUnKnown = isUnKnown; + this.updatedTime = updatedTime; + } + + public static PartitionStatistic fromResultRow(List resultRows) { + if (resultRows.size() > 1) { + for (ResultRow resultRow : resultRows) { + LOG.warn("Partition stats has more than one row. [{}]", resultRow); + } + } + PartitionStatistic partitionStatistic = null; + try { + for (ResultRow resultRow : resultRows) { + partitionStatistic = fromResultRow(resultRow); + } + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to deserialize column stats", t); + } + return PartitionStatistic.UNKNOWN; + } + if (partitionStatistic == null) { + return PartitionStatistic.UNKNOWN; + } + return partitionStatistic; + } + + public static PartitionStatistic fromResultRow(ResultRow row) { + // row : [catalog_id, db_id, tbl_id, idx_id, col_id, count, ndv, null_count, min, max, data_size, update_time] + try { + long catalogId = Long.parseLong(row.get(0)); + long dbID = Long.parseLong(row.get(1)); + long tblId = Long.parseLong(row.get(2)); + long idxId = Long.parseLong(row.get(3)); + String colName = row.get(4); + Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName); + if (col == null) { + LOG.info("Failed to deserialize column statistics, ctlId: {} dbId: {}, " + + "tblId: {} column: {} not exists", catalogId, dbID, tblId, colName); + return PartitionStatistic.UNKNOWN; + } + + PartitionStatisticBuilder partitionStatisticBuilder = new PartitionStatisticBuilder(); + double count = Double.parseDouble(row.get(5)); + partitionStatisticBuilder.setCount(count); + String ndv = row.get(6); + Base64.Decoder decoder = Base64.getDecoder(); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(decoder.decode(ndv))); + Hll hll = new Hll(); + if (!hll.deserialize(dis)) { + LOG.warn("Failed to deserialize ndv. {}", ndv); + return null; + } + partitionStatisticBuilder.setNdv(Hll128.fromHll(hll)); + String nullCount = row.getWithDefault(7, "0"); + partitionStatisticBuilder.setNumNulls(Double.parseDouble(nullCount)); + partitionStatisticBuilder.setDataSize(Double + .parseDouble(row.getWithDefault(10, "0"))); + partitionStatisticBuilder.setAvgSizeByte(partitionStatisticBuilder.getCount() == 0 + ? 0 : partitionStatisticBuilder.getDataSize() + / partitionStatisticBuilder.getCount()); + String min = row.get(8); + String max = row.get(9); + if (min != null && !min.equalsIgnoreCase("NULL")) { + try { + partitionStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); + partitionStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); + } catch (AnalysisException e) { + LOG.warn("Failed to deserialize column {} min value {}.", col, min, e); + partitionStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } + } else { + partitionStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } + if (max != null && !max.equalsIgnoreCase("NULL")) { + try { + partitionStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max)); + partitionStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max)); + } catch (AnalysisException e) { + LOG.warn("Failed to deserialize column {} max value {}.", col, max, e); + partitionStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); + } + } else { + partitionStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); + } + partitionStatisticBuilder.setUpdatedTime(row.get(11)); + return partitionStatisticBuilder.build(); + } catch (Exception e) { + LOG.warn("Failed to deserialize column statistics. Row [{}]", row, e); + return PartitionStatistic.UNKNOWN; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticBuilder.java new file mode 100644 index 000000000000000..95e5f8c06693c78 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticBuilder.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.statistics.util.Hll128; + +public class PartitionStatisticBuilder { + private double count; + private Hll128 ndv; + private double avgSizeByte; + private double numNulls; + private double dataSize; + private double minValue = Double.NEGATIVE_INFINITY; + private double maxValue = Double.POSITIVE_INFINITY; + private LiteralExpr minExpr; + private LiteralExpr maxExpr; + + private boolean isUnknown; + + private String updatedTime; + + public PartitionStatisticBuilder() { + } + + public PartitionStatisticBuilder(PartitionStatistic statistic) { + this.count = statistic.count; + this.ndv = statistic.ndv; + this.avgSizeByte = statistic.avgSizeByte; + this.numNulls = statistic.numNulls; + this.dataSize = statistic.dataSize; + this.minValue = statistic.minValue; + this.maxValue = statistic.maxValue; + this.minExpr = statistic.minExpr; + this.maxExpr = statistic.maxExpr; + this.isUnknown = statistic.isUnKnown; + this.updatedTime = statistic.updatedTime; + } + + public PartitionStatisticBuilder setCount(double count) { + this.count = count; + return this; + } + + public PartitionStatisticBuilder setNdv(Hll128 ndv) { + this.ndv = ndv; + return this; + } + + public PartitionStatisticBuilder setAvgSizeByte(double avgSizeByte) { + this.avgSizeByte = avgSizeByte; + return this; + } + + public PartitionStatisticBuilder setNumNulls(double numNulls) { + this.numNulls = numNulls; + return this; + } + + public PartitionStatisticBuilder setDataSize(double dataSize) { + this.dataSize = dataSize; + return this; + } + + public PartitionStatisticBuilder setMinValue(double minValue) { + this.minValue = minValue; + return this; + } + + public PartitionStatisticBuilder setMaxValue(double maxValue) { + this.maxValue = maxValue; + return this; + } + + public PartitionStatisticBuilder setMinExpr(LiteralExpr minExpr) { + this.minExpr = minExpr; + return this; + } + + public PartitionStatisticBuilder setMaxExpr(LiteralExpr maxExpr) { + this.maxExpr = maxExpr; + return this; + } + + public PartitionStatisticBuilder setIsUnknown(boolean isUnknown) { + this.isUnknown = isUnknown; + return this; + } + + public PartitionStatisticBuilder setUpdatedTime(String updatedTime) { + this.updatedTime = updatedTime; + return this; + } + + public double getCount() { + return count; + } + + public Hll128 getNdv() { + return ndv; + } + + public double getAvgSizeByte() { + return avgSizeByte; + } + + public double getNumNulls() { + return numNulls; + } + + public double getDataSize() { + return dataSize; + } + + public double getMinValue() { + return minValue; + } + + public double getMaxValue() { + return maxValue; + } + + public LiteralExpr getMinExpr() { + return minExpr; + } + + public LiteralExpr getMaxExpr() { + return maxExpr; + } + + public boolean isUnknown() { + return isUnknown; + } + + public String getUpdatedTime() { + return updatedTime; + } + + public PartitionStatistic build() { + dataSize = dataSize > 0 ? dataSize : Math.max((count - numNulls + 1) * avgSizeByte, 0); + return new PartitionStatistic(count, ndv, avgSizeByte, numNulls, + dataSize, minValue, maxValue, minExpr, maxExpr, + isUnknown, updatedTime); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticCacheKey.java new file mode 100644 index 000000000000000..3e9099779a21bde --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticCacheKey.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import com.google.gson.annotations.SerializedName; + +import java.util.Objects; +import java.util.StringJoiner; + +public class PartitionStatisticCacheKey { + + @SerializedName("catalogId") + public final long catalogId; + @SerializedName("dbId") + public final long dbId; + @SerializedName("tableId") + public final long tableId; + @SerializedName("idxId") + public final long idxId; + @SerializedName("partId") + public final String partId; + @SerializedName("colName") + public final String colName; + + private static final String DELIMITER = "-"; + + public PartitionStatisticCacheKey(long catalogId, long dbId, long tableId, long idxId, + String partId, String colName) { + this.catalogId = catalogId; + this.dbId = dbId; + this.tableId = tableId; + this.idxId = idxId; + this.partId = partId; + this.colName = colName; + } + + @Override + public int hashCode() { + return Objects.hash(catalogId, dbId, tableId, idxId, colName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PartitionStatisticCacheKey k = (PartitionStatisticCacheKey) obj; + return this.catalogId == k.catalogId && this.dbId == k.dbId && this.tableId == k.tableId + && this.idxId == k.idxId && this.partId.equals(k.partId) && this.colName.equals(k.colName); + } + + @Override + public String toString() { + StringJoiner sj = new StringJoiner(DELIMITER); + sj.add("PartitionColumnStats"); + sj.add(String.valueOf(catalogId)); + sj.add(String.valueOf(dbId)); + sj.add(String.valueOf(tableId)); + sj.add(String.valueOf(idxId)); + sj.add(partId); + sj.add(colName); + return sj.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticCacheLoader.java new file mode 100644 index 000000000000000..872815c775c662a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStatisticCacheLoader.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.qe.InternalQueryExecutionException; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Optional; + +public class PartitionStatisticCacheLoader extends + BasicAsyncCacheLoader> { + + private static final Logger LOG = LogManager.getLogger(PartitionStatisticCacheLoader.class); + + @Override + protected Optional doLoad(PartitionStatisticCacheKey key) { + Optional partitionStatistic = Optional.empty(); + try { + partitionStatistic = loadFromStatsTable(key); + } catch (Throwable t) { + LOG.warn("Failed to load stats for column [Catalog:{}, DB:{}, Table:{}, Part:{}, Column:{}]," + + "Reason: {}", key.catalogId, key.dbId, key.tableId, key.partId, key.colName, t.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug(t); + } + } + if (partitionStatistic.isPresent()) { + // For non-empty table, return UNKNOWN if we can't collect ndv value. + // Because inaccurate ndv is very misleading. + PartitionStatistic stats = partitionStatistic.get(); + if (stats.count > 0 && stats.ndv.estimateCardinality() == 0 && stats.count != stats.numNulls) { + partitionStatistic = Optional.of(PartitionStatistic.UNKNOWN); + } + } + return partitionStatistic; + } + + private Optional loadFromStatsTable(PartitionStatisticCacheKey key) { + List partitionResults; + try { + partitionResults = StatisticsRepository.loadPartitionStats( + key.catalogId, key.dbId, key.tableId, key.idxId, key.partId, key.colName); + } catch (InternalQueryExecutionException e) { + LOG.info("Failed to load stats for table {} column {}. Reason:{}", + key.tableId, key.colName, e.getMessage()); + return Optional.empty(); + } + PartitionStatistic partitionStatistic; + try { + partitionStatistic = StatisticsUtil.deserializeToPartitionStatistics(partitionResults); + } catch (Exception e) { + LOG.warn("Exception to deserialize partition statistics", e); + return Optional.empty(); + } + if (partitionStatistic == null) { + return Optional.empty(); + } else { + return Optional.of(partitionStatistic); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 8a96a0218ce61c0..b28a421d421e1ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -60,6 +60,7 @@ public class StatisticsCache { private final ColumnStatisticsCacheLoader columnStatisticsCacheLoader = new ColumnStatisticsCacheLoader(); private final HistogramCacheLoader histogramCacheLoader = new HistogramCacheLoader(); + private final PartitionStatisticCacheLoader partitionCacheLoader = new PartitionStatisticCacheLoader(); private final AsyncLoadingCache> columnStatisticsCache = Caffeine.newBuilder() @@ -75,6 +76,13 @@ public class StatisticsCache { .executor(threadPool) .buildAsync(histogramCacheLoader); + private final AsyncLoadingCache> partitionStatisticCache = + Caffeine.newBuilder() + .maximumSize(Config.stats_cache_size) + .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL)) + .executor(threadPool) + .buildAsync(partitionCacheLoader); + public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId, long idxId, String colName) { ConnectContext ctx = ConnectContext.get(); if (ctx != null && ctx.getSessionVariable().internalSession) { @@ -92,6 +100,24 @@ public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId return ColumnStatistic.UNKNOWN; } + public PartitionStatistic getPartitionStatistics(long catalogId, long dbId, long tblId, long idxId, + String partId, String colName) { + ConnectContext ctx = ConnectContext.get(); + if (ctx != null && ctx.getSessionVariable().internalSession) { + return PartitionStatistic.UNKNOWN; + } + PartitionStatisticCacheKey k = new PartitionStatisticCacheKey(catalogId, dbId, tblId, idxId, partId, colName); + try { + CompletableFuture> f = partitionStatisticCache.get(k); + if (f.isDone()) { + return f.get().orElse(PartitionStatistic.UNKNOWN); + } + } catch (Exception e) { + LOG.warn("Unexpected exception while returning ColumnStatistic", e); + } + return PartitionStatistic.UNKNOWN; + } + public Histogram getHistogram(long ctlId, long dbId, long tblId, String colName) { return getHistogram(ctlId, dbId, tblId, -1, colName).orElse(null); } @@ -113,8 +139,13 @@ private Optional getHistogram(long ctlId, long dbId, long tblId, long return Optional.empty(); } - public void invalidate(long ctlId, long dbId, long tblId, long idxId, String colName) { - columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName)); + public void invalidate(long ctlId, long dbId, long tblId, long idxId, String partId, String colName) { + if (partId == null) { + columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName)); + } else { + partitionStatisticCache.synchronous().invalidate( + new PartitionStatisticCacheKey(ctlId, dbId, tblId, idxId, partId, colName)); + } } public void updateColStatsCache(long ctlId, long dbId, long tblId, long idxId, String colName, @@ -192,7 +223,7 @@ public void syncColStats(ColStatsData data) { statsId.idxId, statsId.colId); ColumnStatistic columnStatistic = data.toColumnStatistic(); if (columnStatistic == ColumnStatistic.UNKNOWN) { - invalidate(k.catalogId, k.dbId, k.tableId, k.idxId, k.colName); + invalidate(k.catalogId, k.dbId, k.tableId, k.idxId, null, k.colName); } else { putCache(k, columnStatistic); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index b6ea5bf3947dc26..80a04b95f56848f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -67,6 +67,12 @@ public class StatisticsRepository { + FULL_QUALIFIED_COLUMN_STATISTICS_NAME + " WHERE `id` = '${id}' AND `catalog_id` = '${catalogId}' AND `db_id` = '${dbId}'"; + private static final String FETCH_PARTITION_STATISTIC_TEMPLATE = "SELECT catalog_id, db_id, tbl_id, idx_id, " + + "col_id, count, hll_to_base64(ndv) as ndv, null_count, min, max, data_size_in_bytes, update_time FROM " + + FULL_QUALIFIED_PARTITION_STATISTICS_NAME + + " WHERE `catalog_id` = '${catalogId}' AND `db_id` = '${dbId}' AND `tbl_id` = ${tableId}" + + " AND `idx_id` = ${indexId} AND `part_id` = '${partId}' AND `col_id` = '${columnId}'"; + private static final String FETCH_PARTITIONS_STATISTIC_TEMPLATE = "SELECT col_id, part_id, idx_id, count, " + "hll_cardinality(ndv) as ndv, null_count, min, max, data_size_in_bytes, update_time FROM " + FULL_QUALIFIED_PARTITION_STATISTICS_NAME @@ -409,6 +415,18 @@ public static List loadColStats(long ctlId, long dbId, long tableId, .replace(FETCH_COLUMN_STATISTIC_TEMPLATE)); } + public static List loadPartitionStats(long ctlId, long dbId, long tableId, long idxId, + String partName, String colName) { + Map params = new HashMap<>(); + generateCtlDbIdParams(ctlId, dbId, params); + params.put("tableId", String.valueOf(tableId)); + params.put("indexId", String.valueOf(idxId)); + params.put("partId", partName); + params.put("columnId", colName); + return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(FETCH_PARTITION_STATISTIC_TEMPLATE)); + } + public static List loadPartStats(Collection keys) { String inPredicate = "CONCAT(tbl_id, '-', idx_id, '-', col_id) in (%s)"; StringJoiner sj = new StringJoiner(","); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 75d6165c35b50ef..b3f121a356a5980 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -72,6 +72,7 @@ import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.ColumnStatisticBuilder; import org.apache.doris.statistics.Histogram; +import org.apache.doris.statistics.PartitionStatistic; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.TableStatsMeta; @@ -185,11 +186,17 @@ public static ColumnStatistic deserializeToColumnStatistics(List resu return ColumnStatistic.fromResultRow(resultBatches); } - public static List deserializeToHistogramStatistics(List resultBatches) - throws Exception { + public static List deserializeToHistogramStatistics(List resultBatches) { return resultBatches.stream().map(Histogram::fromResultRow).collect(Collectors.toList()); } + public static PartitionStatistic deserializeToPartitionStatistics(List resultBatches) { + if (CollectionUtils.isEmpty(resultBatches)) { + return null; + } + return PartitionStatistic.fromResultRow(resultBatches); + } + public static AutoCloseConnectContext buildConnectContext() { return buildConnectContext(false, false); } diff --git a/regression-test/suites/statistics/test_show_partition_stats.groovy b/regression-test/suites/statistics/test_show_partition_stats.groovy index 45253d43a6cd750..01748f1658e6bc0 100644 --- a/regression-test/suites/statistics/test_show_partition_stats.groovy +++ b/regression-test/suites/statistics/test_show_partition_stats.groovy @@ -87,6 +87,36 @@ suite("test_show_partition_stats") { def result = sql """show table stats part""" assertEquals(1, result.size()) + assertEquals("18", result[0][0]) + + sql """analyze table part with sync;""" + result = sql """show column cached stats part(id) partition(p1)""" + assertEquals(1, result.size()) + Thread.sleep(1000) + for (int i = 0; i < 10; i++) { + result = sql """show column cached stats part(id) partition(p1)""" + if (result[0][3] == "6.0") { + logger.info("cache is ready.") + assertEquals("id", result[0][0]) + assertEquals("p1", result[0][1]) + assertEquals("part", result[0][2]) + assertEquals("6.0", result[0][3]) + assertEquals("6", result[0][4]) + assertEquals("0.0", result[0][5]) + assertEquals("1.0", result[0][6]) + assertEquals("6.0", result[0][7]) + assertEquals("24.0", result[0][8]) + assertEquals("N/A", result[0][10]) + assertEquals("N/A", result[0][11]) + break; + } + logger.info("cache is not ready yet.") + Thread.sleep(1000) + } + result = sql """show column cached stats part partition(p1)""" + assertEquals(9, result.size()) + result = sql """show column cached stats part partition(*)""" + assertEquals(27, result.size()) sql """drop database if exists test_show_partition_stats""" }