From 3657ad2caaa02b4ccd60174456d9f87c83397b56 Mon Sep 17 00:00:00 2001 From: Jibing Li Date: Mon, 27 May 2024 12:20:28 +0800 Subject: [PATCH] Support partition stats cache --- .../doris/analysis/ShowColumnStatsStmt.java | 29 +++ .../org/apache/doris/qe/ShowExecutor.java | 47 ++++- .../doris/service/FrontendServiceImpl.java | 3 +- .../doris/statistics/AnalysisManager.java | 2 +- .../doris/statistics/ColumnStatistic.java | 5 +- .../ColumnStatisticsCacheLoader.java | 7 +- .../statistics/PartitionColumnStatistic.java | 168 ++++++++++++++++++ .../PartitionColumnStatisticBuilder.java | 158 ++++++++++++++++ .../PartitionColumnStatisticCacheKey.java | 82 +++++++++ .../PartitionColumnStatisticCacheLoader.java | 76 ++++++++ .../doris/statistics/StatisticsCache.java | 44 ++++- .../statistics/StatisticsRepository.java | 18 ++ .../doris/statistics/util/StatisticsUtil.java | 11 +- .../test_show_partition_stats.groovy | 30 ++++ 14 files changed, 660 insertions(+), 20 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatistic.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticBuilder.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticCacheLoader.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java index 6c54175784f951..0976e406d943d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java @@ -36,6 +36,8 @@ import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.ColStatsMeta; import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.PartitionColumnStatistic; +import org.apache.doris.statistics.PartitionColumnStatisticCacheKey; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.TableStatsMeta; @@ -44,6 +46,7 @@ import com.google.common.collect.Sets; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -225,6 +228,32 @@ public ShowResultSet constructPartitionResultSet(List resultRows, Tab return new ShowResultSet(getMetaData(), result); } + public ShowResultSet constructPartitionCachedColumnStats( + Map resultMap, TableIf tableIf) { + List> result = Lists.newArrayList(); + for (Map.Entry entry : resultMap.entrySet()) { + PartitionColumnStatisticCacheKey key = entry.getKey(); + PartitionColumnStatistic value = entry.getValue(); + List row = Lists.newArrayList(); + row.add(key.colName); // column_name + row.add(key.partId); // partition_name + long indexId = key.idxId; + String indexName = indexId == -1 ? tableIf.getName() : ((OlapTable) tableIf).getIndexNameById(indexId); + row.add(indexName); // index_name. + row.add(String.valueOf(value.count)); // count + row.add(String.valueOf(value.ndv.estimateCardinality())); // ndv + row.add(String.valueOf(value.numNulls)); // num_null + row.add(String.valueOf(value.minValue)); // min + row.add(String.valueOf(value.maxValue)); // max + row.add(String.valueOf(value.dataSize)); // data_size + row.add(value.updatedTime); // updated_time + row.add("N/A"); // update_rows + row.add("N/A"); // trigger + result.add(row); + } + return new ShowResultSet(getMetaData(), result); + } + public PartitionNames getPartitionNames() { return partitionNames; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index c767c45f4b35b8..21dd663024976a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -218,6 +218,8 @@ import org.apache.doris.statistics.AutoAnalysisPendingJob; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; +import org.apache.doris.statistics.PartitionColumnStatistic; +import org.apache.doris.statistics.PartitionColumnStatisticCacheKey; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticsRepository; import org.apache.doris.statistics.TableStatsMeta; @@ -2636,9 +2638,14 @@ private void handleShowColumnStats() throws AnalysisException { List partNames = partitionNames.getPartitionNames() == null ? new ArrayList<>(tableIf.getPartitionNames()) : partitionNames.getPartitionNames(); - List partitionColumnStats = - StatisticsRepository.queryColumnStatisticsByPartitions(tableIf, columnNames, partNames); - resultSet = showColumnStatsStmt.constructPartitionResultSet(partitionColumnStats, tableIf); + if (showCache) { + resultSet = showColumnStatsStmt.constructPartitionCachedColumnStats( + getCachedPartitionColumnStats(columnNames, partNames, tableIf), tableIf); + } else { + List partitionColumnStats = + StatisticsRepository.queryColumnStatisticsByPartitions(tableIf, columnNames, partNames); + resultSet = showColumnStatsStmt.constructPartitionResultSet(partitionColumnStats, tableIf); + } } else { if (isAllColumns && !showCache) { getStatsForAllColumns(columnStatistics, tableIf); @@ -2704,6 +2711,40 @@ private void getStatsForSpecifiedColumns(List, ColumnS } } + private Map getCachedPartitionColumnStats( + Set columnNames, List partitionNames, TableIf tableIf) { + Map ret = new HashMap<>(); + long catalogId = tableIf.getDatabase().getCatalog().getId(); + long dbId = tableIf.getDatabase().getId(); + long tableId = tableIf.getId(); + for (String colName : columnNames) { + // Olap base index use -1 as index id. + List indexIds = Lists.newArrayList(); + if (tableIf instanceof OlapTable) { + indexIds = ((OlapTable) tableIf).getMvColumnIndexIds(colName); + } else { + indexIds.add(-1L); + } + for (long indexId : indexIds) { + String indexName = tableIf.getName(); + if (tableIf instanceof OlapTable) { + OlapTable olapTable = (OlapTable) tableIf; + indexName = olapTable.getIndexNameById(indexId == -1 ? olapTable.getBaseIndexId() : indexId); + } + if (indexName == null) { + continue; + } + for (String partName : partitionNames) { + PartitionColumnStatistic partitionStatistics = Env.getCurrentEnv().getStatisticsCache() + .getPartitionColumnStatistics(catalogId, dbId, tableId, indexId, partName, colName); + ret.put(new PartitionColumnStatisticCacheKey(catalogId, dbId, tableId, indexId, partName, colName), + partitionStatistics); + } + } + } + return ret; + } + public void handleShowColumnHist() { // TODO: support histogram in the future. ShowColumnHistStmt showColumnHistStmt = (ShowColumnHistStmt) stmt; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ca3c43470bed11..7fa2cdcbe42aba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3178,7 +3178,8 @@ public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws ColStatsData data = GsonUtils.GSON.fromJson(request.colStatsData, ColStatsData.class); ColumnStatistic c = data.toColumnStatistic(); if (c == ColumnStatistic.UNKNOWN) { - Env.getCurrentEnv().getStatisticsCache().invalidate(k.catalogId, k.dbId, k.tableId, k.idxId, k.colName); + Env.getCurrentEnv().getStatisticsCache().invalidate(k.catalogId, k.dbId, k.tableId, + k.idxId, null, k.colName); } else { Env.getCurrentEnv().getStatisticsCache().updateColStatsCache( k.catalogId, k.dbId, k.tableId, k.idxId, k.colName, c); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index eb10fb09c7b8e7..9ca8edafe882f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -703,7 +703,7 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId, } } tableStats.removeColumn(indexName, column); - statisticsCache.invalidate(catalogId, dbId, tableId, indexId, column); + statisticsCache.invalidate(catalogId, dbId, tableId, indexId, null, column); } } tableStats.updatedTime = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index d51242ed0791e7..f0aa3233fef3ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -30,9 +30,7 @@ import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; public class ColumnStatistic { @@ -110,7 +108,6 @@ public ColumnStatistic(double count, double ndv, ColumnStatistic original, doubl } public static ColumnStatistic fromResultRow(List resultRows) { - Map partitionIdToColStats = new HashMap<>(); ColumnStatistic columnStatistic = null; try { for (ResultRow resultRow : resultRows) { @@ -118,7 +115,7 @@ public static ColumnStatistic fromResultRow(List resultRows) { if (partId == null) { columnStatistic = fromResultRow(resultRow); } else { - partitionIdToColStats.put(partId, fromResultRow(resultRow)); + LOG.warn("Column statistics table shouldn't contain partition stats. [{}]", resultRow); } } } catch (Throwable t) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java index 056ed7bcee5c7c..fba1a4d7b72061 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java @@ -44,9 +44,8 @@ protected Optional doLoad(StatisticsCacheKey key) { columnStatistic = table.getColumnStatistic(key.colName); } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Exception to get column statistics by metadata." - + "[Catalog:{}, DB:{}, Table:{}]", - key.catalogId, key.dbId, key.tableId), e); + LOG.debug("Exception to get column statistics by metadata. [Catalog:{}, DB:{}, Table:{}]", + key.catalogId, key.dbId, key.tableId, e); } } } @@ -69,7 +68,7 @@ protected Optional doLoad(StatisticsCacheKey key) { } private Optional loadFromStatsTable(StatisticsCacheKey key) { - List columnResults = null; + List columnResults; try { columnResults = StatisticsRepository.loadColStats( key.catalogId, key.dbId, key.tableId, key.idxId, key.colName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatistic.java new file mode 100644 index 00000000000000..3a440a9102afb2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatistic.java @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.io.Hll; +import org.apache.doris.statistics.util.Hll128; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.Base64; +import java.util.List; +import java.util.StringJoiner; + +public class PartitionColumnStatistic { + + private static final Logger LOG = LogManager.getLogger(PartitionColumnStatistic.class); + + public static PartitionColumnStatistic UNKNOWN = new PartitionColumnStatisticBuilder().setAvgSizeByte(1) + .setNdv(new Hll128()).setNumNulls(1).setCount(1).setMaxValue(Double.POSITIVE_INFINITY) + .setMinValue(Double.NEGATIVE_INFINITY) + .setIsUnknown(true).setUpdatedTime("") + .build(); + + public static PartitionColumnStatistic ZERO = new PartitionColumnStatisticBuilder().setAvgSizeByte(0) + .setNdv(new Hll128()).setNumNulls(0).setCount(0).setMaxValue(Double.NaN).setMinValue(Double.NaN) + .build(); + + public final double count; + public final Hll128 ndv; + public final double numNulls; + public final double dataSize; + public final double avgSizeByte; + public final double minValue; + public final double maxValue; + public final boolean isUnKnown; + public final LiteralExpr minExpr; + public final LiteralExpr maxExpr; + public final String updatedTime; + + public PartitionColumnStatistic(double count, Hll128 ndv, double avgSizeByte, + double numNulls, double dataSize, double minValue, double maxValue, + LiteralExpr minExpr, LiteralExpr maxExpr, boolean isUnKnown, + String updatedTime) { + this.count = count; + this.ndv = ndv; + this.avgSizeByte = avgSizeByte; + this.numNulls = numNulls; + this.dataSize = dataSize; + this.minValue = minValue; + this.maxValue = maxValue; + this.minExpr = minExpr; + this.maxExpr = maxExpr; + this.isUnKnown = isUnKnown; + this.updatedTime = updatedTime; + } + + public static PartitionColumnStatistic fromResultRow(List resultRows) { + if (resultRows == null || resultRows.isEmpty()) { + return PartitionColumnStatistic.UNKNOWN; + } + // This should never happen. resultRows should be empty or contain only 1 result row. + if (resultRows.size() > 1) { + StringJoiner stringJoiner = new StringJoiner("][", "[", "]"); + for (ResultRow row : resultRows) { + stringJoiner.add(row.toString()); + } + LOG.warn("Partition stats has more than one row, please drop stats and analyze again. {}", + stringJoiner.toString()); + return PartitionColumnStatistic.UNKNOWN; + } + try { + return fromResultRow(resultRows.get(0)); + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to deserialize column stats", t); + } + return PartitionColumnStatistic.UNKNOWN; + } + } + + public static PartitionColumnStatistic fromResultRow(ResultRow row) { + // row : [catalog_id, db_id, tbl_id, idx_id, col_id, count, ndv, null_count, min, max, data_size, update_time] + try { + long catalogId = Long.parseLong(row.get(0)); + long dbID = Long.parseLong(row.get(1)); + long tblId = Long.parseLong(row.get(2)); + long idxId = Long.parseLong(row.get(3)); + String colName = row.get(4); + Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName); + if (col == null) { + LOG.info("Failed to deserialize column statistics, ctlId: {} dbId: {}, " + + "tblId: {} column: {} not exists", catalogId, dbID, tblId, colName); + return PartitionColumnStatistic.UNKNOWN; + } + + PartitionColumnStatisticBuilder partitionStatisticBuilder = new PartitionColumnStatisticBuilder(); + double count = Double.parseDouble(row.get(5)); + partitionStatisticBuilder.setCount(count); + String ndv = row.get(6); + Base64.Decoder decoder = Base64.getDecoder(); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(decoder.decode(ndv))); + Hll hll = new Hll(); + if (!hll.deserialize(dis)) { + LOG.warn("Failed to deserialize ndv. [{}]", row); + return PartitionColumnStatistic.UNKNOWN; + } + partitionStatisticBuilder.setNdv(Hll128.fromHll(hll)); + String nullCount = row.getWithDefault(7, "0"); + partitionStatisticBuilder.setNumNulls(Double.parseDouble(nullCount)); + partitionStatisticBuilder.setDataSize(Double + .parseDouble(row.getWithDefault(10, "0"))); + partitionStatisticBuilder.setAvgSizeByte(partitionStatisticBuilder.getCount() == 0 + ? 0 : partitionStatisticBuilder.getDataSize() + / partitionStatisticBuilder.getCount()); + String min = row.get(8); + String max = row.get(9); + if (!"NULL".equalsIgnoreCase(min)) { + try { + partitionStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); + partitionStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); + } catch (AnalysisException e) { + LOG.warn("Failed to deserialize column {} min value {}.", col, min, e); + partitionStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } + } else { + partitionStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } + if (!"NULL".equalsIgnoreCase(max)) { + try { + partitionStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max)); + partitionStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max)); + } catch (AnalysisException e) { + LOG.warn("Failed to deserialize column {} max value {}.", col, max, e); + partitionStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); + } + } else { + partitionStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); + } + partitionStatisticBuilder.setUpdatedTime(row.get(11)); + return partitionStatisticBuilder.build(); + } catch (Exception e) { + LOG.warn("Failed to deserialize column statistics. Row [{}]", row, e); + return PartitionColumnStatistic.UNKNOWN; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticBuilder.java new file mode 100644 index 00000000000000..ecb0624d4e8b14 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticBuilder.java @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.statistics.util.Hll128; + +public class PartitionColumnStatisticBuilder { + private double count; + private Hll128 ndv; + private double avgSizeByte; + private double numNulls; + private double dataSize; + private double minValue = Double.NEGATIVE_INFINITY; + private double maxValue = Double.POSITIVE_INFINITY; + private LiteralExpr minExpr; + private LiteralExpr maxExpr; + private boolean isUnknown; + private String updatedTime; + + public PartitionColumnStatisticBuilder() { + } + + public PartitionColumnStatisticBuilder(PartitionColumnStatistic statistic) { + this.count = statistic.count; + this.ndv = statistic.ndv; + this.avgSizeByte = statistic.avgSizeByte; + this.numNulls = statistic.numNulls; + this.dataSize = statistic.dataSize; + this.minValue = statistic.minValue; + this.maxValue = statistic.maxValue; + this.minExpr = statistic.minExpr; + this.maxExpr = statistic.maxExpr; + this.isUnknown = statistic.isUnKnown; + this.updatedTime = statistic.updatedTime; + } + + public PartitionColumnStatisticBuilder setCount(double count) { + this.count = count; + return this; + } + + public PartitionColumnStatisticBuilder setNdv(Hll128 ndv) { + this.ndv = ndv; + return this; + } + + public PartitionColumnStatisticBuilder setAvgSizeByte(double avgSizeByte) { + this.avgSizeByte = avgSizeByte; + return this; + } + + public PartitionColumnStatisticBuilder setNumNulls(double numNulls) { + this.numNulls = numNulls; + return this; + } + + public PartitionColumnStatisticBuilder setDataSize(double dataSize) { + this.dataSize = dataSize; + return this; + } + + public PartitionColumnStatisticBuilder setMinValue(double minValue) { + this.minValue = minValue; + return this; + } + + public PartitionColumnStatisticBuilder setMaxValue(double maxValue) { + this.maxValue = maxValue; + return this; + } + + public PartitionColumnStatisticBuilder setMinExpr(LiteralExpr minExpr) { + this.minExpr = minExpr; + return this; + } + + public PartitionColumnStatisticBuilder setMaxExpr(LiteralExpr maxExpr) { + this.maxExpr = maxExpr; + return this; + } + + public PartitionColumnStatisticBuilder setIsUnknown(boolean isUnknown) { + this.isUnknown = isUnknown; + return this; + } + + public PartitionColumnStatisticBuilder setUpdatedTime(String updatedTime) { + this.updatedTime = updatedTime; + return this; + } + + public double getCount() { + return count; + } + + public Hll128 getNdv() { + return ndv; + } + + public double getAvgSizeByte() { + return avgSizeByte; + } + + public double getNumNulls() { + return numNulls; + } + + public double getDataSize() { + return dataSize; + } + + public double getMinValue() { + return minValue; + } + + public double getMaxValue() { + return maxValue; + } + + public LiteralExpr getMinExpr() { + return minExpr; + } + + public LiteralExpr getMaxExpr() { + return maxExpr; + } + + public boolean isUnknown() { + return isUnknown; + } + + public String getUpdatedTime() { + return updatedTime; + } + + public PartitionColumnStatistic build() { + dataSize = dataSize > 0 ? dataSize : Math.max((count - numNulls + 1) * avgSizeByte, 0); + return new PartitionColumnStatistic(count, ndv, avgSizeByte, numNulls, + dataSize, minValue, maxValue, minExpr, maxExpr, + isUnknown, updatedTime); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticCacheKey.java new file mode 100644 index 00000000000000..8bacd61b218ec3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticCacheKey.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import com.google.gson.annotations.SerializedName; + +import java.util.Objects; +import java.util.StringJoiner; + +public class PartitionColumnStatisticCacheKey { + + @SerializedName("catalogId") + public final long catalogId; + @SerializedName("dbId") + public final long dbId; + @SerializedName("tableId") + public final long tableId; + @SerializedName("idxId") + public final long idxId; + @SerializedName("partId") + public final String partId; + @SerializedName("colName") + public final String colName; + + private static final String DELIMITER = "-"; + + public PartitionColumnStatisticCacheKey(long catalogId, long dbId, long tableId, long idxId, + String partId, String colName) { + this.catalogId = catalogId; + this.dbId = dbId; + this.tableId = tableId; + this.idxId = idxId; + this.partId = partId; + this.colName = colName; + } + + @Override + public int hashCode() { + return Objects.hash(catalogId, dbId, tableId, idxId, colName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PartitionColumnStatisticCacheKey k = (PartitionColumnStatisticCacheKey) obj; + return this.catalogId == k.catalogId && this.dbId == k.dbId && this.tableId == k.tableId + && this.idxId == k.idxId && this.partId.equals(k.partId) && this.colName.equals(k.colName); + } + + @Override + public String toString() { + StringJoiner sj = new StringJoiner(DELIMITER); + sj.add("PartitionColumnStats"); + sj.add(String.valueOf(catalogId)); + sj.add(String.valueOf(dbId)); + sj.add(String.valueOf(tableId)); + sj.add(String.valueOf(idxId)); + sj.add(partId); + sj.add(colName); + return sj.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticCacheLoader.java new file mode 100644 index 00000000000000..478c90e2f11f0c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionColumnStatisticCacheLoader.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.qe.InternalQueryExecutionException; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Optional; + +public class PartitionColumnStatisticCacheLoader extends + BasicAsyncCacheLoader> { + + private static final Logger LOG = LogManager.getLogger(PartitionColumnStatisticCacheLoader.class); + + @Override + protected Optional doLoad(PartitionColumnStatisticCacheKey key) { + Optional partitionStatistic = Optional.empty(); + try { + partitionStatistic = loadFromPartitionStatsTable(key); + } catch (Throwable t) { + LOG.warn("Failed to load stats for column [Catalog:{}, DB:{}, Table:{}, Part:{}, Column:{}]," + + "Reason: {}", key.catalogId, key.dbId, key.tableId, key.partId, key.colName, t.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug(t); + } + } + if (partitionStatistic.isPresent()) { + // For non-empty table, return UNKNOWN if we can't collect ndv value. + // Because inaccurate ndv is very misleading. + PartitionColumnStatistic stats = partitionStatistic.get(); + if (stats.count > 0 && stats.ndv.estimateCardinality() == 0 && stats.count != stats.numNulls) { + partitionStatistic = Optional.of(PartitionColumnStatistic.UNKNOWN); + } + } + return partitionStatistic; + } + + private Optional loadFromPartitionStatsTable(PartitionColumnStatisticCacheKey key) { + List partitionResults; + try { + partitionResults = StatisticsRepository.loadPartitionColumnStats( + key.catalogId, key.dbId, key.tableId, key.idxId, key.partId, key.colName); + } catch (InternalQueryExecutionException e) { + LOG.info("Failed to load stats for table {} column {}. Reason:{}", + key.tableId, key.colName, e.getMessage()); + return Optional.empty(); + } + PartitionColumnStatistic partitionStatistic; + try { + partitionStatistic = StatisticsUtil.deserializeToPartitionStatistics(partitionResults); + } catch (Exception e) { + LOG.warn("Exception to deserialize partition statistics", e); + return Optional.empty(); + } + return Optional.ofNullable(partitionStatistic); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 8a96a0218ce61c..a824d0e99a8efc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -58,15 +58,17 @@ public class StatisticsCache { = ThreadPoolManager.newDaemonFixedThreadPool( 10, Integer.MAX_VALUE, "STATS_FETCH", true); - private final ColumnStatisticsCacheLoader columnStatisticsCacheLoader = new ColumnStatisticsCacheLoader(); + private final ColumnStatisticsCacheLoader columnStatisticCacheLoader = new ColumnStatisticsCacheLoader(); private final HistogramCacheLoader histogramCacheLoader = new HistogramCacheLoader(); + private final PartitionColumnStatisticCacheLoader partitionColumnStatisticCacheLoader + = new PartitionColumnStatisticCacheLoader(); private final AsyncLoadingCache> columnStatisticsCache = Caffeine.newBuilder() .maximumSize(Config.stats_cache_size) .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL)) .executor(threadPool) - .buildAsync(columnStatisticsCacheLoader); + .buildAsync(columnStatisticCacheLoader); private final AsyncLoadingCache> histogramCache = Caffeine.newBuilder() @@ -75,6 +77,14 @@ public class StatisticsCache { .executor(threadPool) .buildAsync(histogramCacheLoader); + private final AsyncLoadingCache> + partitionColumnStatisticCache = + Caffeine.newBuilder() + .maximumSize(Config.stats_cache_size) + .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL)) + .executor(threadPool) + .buildAsync(partitionColumnStatisticCacheLoader); + public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId, long idxId, String colName) { ConnectContext ctx = ConnectContext.get(); if (ctx != null && ctx.getSessionVariable().internalSession) { @@ -92,6 +102,25 @@ public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId return ColumnStatistic.UNKNOWN; } + public PartitionColumnStatistic getPartitionColumnStatistics(long catalogId, long dbId, long tblId, long idxId, + String partName, String colName) { + ConnectContext ctx = ConnectContext.get(); + if (ctx != null && ctx.getSessionVariable().internalSession) { + return PartitionColumnStatistic.UNKNOWN; + } + PartitionColumnStatisticCacheKey k = new PartitionColumnStatisticCacheKey( + catalogId, dbId, tblId, idxId, partName, colName); + try { + CompletableFuture> f = partitionColumnStatisticCache.get(k); + if (f.isDone()) { + return f.get().orElse(PartitionColumnStatistic.UNKNOWN); + } + } catch (Exception e) { + LOG.warn("Unexpected exception while returning ColumnStatistic", e); + } + return PartitionColumnStatistic.UNKNOWN; + } + public Histogram getHistogram(long ctlId, long dbId, long tblId, String colName) { return getHistogram(ctlId, dbId, tblId, -1, colName).orElse(null); } @@ -113,8 +142,13 @@ private Optional getHistogram(long ctlId, long dbId, long tblId, long return Optional.empty(); } - public void invalidate(long ctlId, long dbId, long tblId, long idxId, String colName) { - columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName)); + public void invalidate(long ctlId, long dbId, long tblId, long idxId, String partId, String colName) { + if (partId == null) { + columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName)); + } else { + partitionColumnStatisticCache.synchronous().invalidate( + new PartitionColumnStatisticCacheKey(ctlId, dbId, tblId, idxId, partId, colName)); + } } public void updateColStatsCache(long ctlId, long dbId, long tblId, long idxId, String colName, @@ -192,7 +226,7 @@ public void syncColStats(ColStatsData data) { statsId.idxId, statsId.colId); ColumnStatistic columnStatistic = data.toColumnStatistic(); if (columnStatistic == ColumnStatistic.UNKNOWN) { - invalidate(k.catalogId, k.dbId, k.tableId, k.idxId, k.colName); + invalidate(k.catalogId, k.dbId, k.tableId, k.idxId, null, k.colName); } else { putCache(k, columnStatistic); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index b6ea5bf3947dc2..d0dc9b810e2148 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -67,6 +67,12 @@ public class StatisticsRepository { + FULL_QUALIFIED_COLUMN_STATISTICS_NAME + " WHERE `id` = '${id}' AND `catalog_id` = '${catalogId}' AND `db_id` = '${dbId}'"; + private static final String FETCH_PARTITION_STATISTIC_TEMPLATE = "SELECT `catalog_id`, `db_id`, `tbl_id`, `idx_id`," + + "`col_id`, `count`, hll_to_base64(`ndv`) as ndv, `null_count`, `min`, `max`, `data_size_in_bytes`, " + + "`update_time` FROM " + FULL_QUALIFIED_PARTITION_STATISTICS_NAME + + " WHERE `catalog_id` = '${catalogId}' AND `db_id` = '${dbId}' AND `tbl_id` = ${tableId}" + + " AND `idx_id` = '${indexId}' AND `part_id` = '${partId}' AND `col_id` = '${columnId}'"; + private static final String FETCH_PARTITIONS_STATISTIC_TEMPLATE = "SELECT col_id, part_id, idx_id, count, " + "hll_cardinality(ndv) as ndv, null_count, min, max, data_size_in_bytes, update_time FROM " + FULL_QUALIFIED_PARTITION_STATISTICS_NAME @@ -409,6 +415,18 @@ public static List loadColStats(long ctlId, long dbId, long tableId, .replace(FETCH_COLUMN_STATISTIC_TEMPLATE)); } + public static List loadPartitionColumnStats(long ctlId, long dbId, long tableId, long idxId, + String partName, String colName) { + Map params = new HashMap<>(); + generateCtlDbIdParams(ctlId, dbId, params); + params.put("tableId", String.valueOf(tableId)); + params.put("indexId", String.valueOf(idxId)); + params.put("partId", partName); + params.put("columnId", colName); + return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(FETCH_PARTITION_STATISTIC_TEMPLATE)); + } + public static List loadPartStats(Collection keys) { String inPredicate = "CONCAT(tbl_id, '-', idx_id, '-', col_id) in (%s)"; StringJoiner sj = new StringJoiner(","); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 75d6165c35b50e..b908ecf2019bcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -72,6 +72,7 @@ import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.ColumnStatisticBuilder; import org.apache.doris.statistics.Histogram; +import org.apache.doris.statistics.PartitionColumnStatistic; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.TableStatsMeta; @@ -185,11 +186,17 @@ public static ColumnStatistic deserializeToColumnStatistics(List resu return ColumnStatistic.fromResultRow(resultBatches); } - public static List deserializeToHistogramStatistics(List resultBatches) - throws Exception { + public static List deserializeToHistogramStatistics(List resultBatches) { return resultBatches.stream().map(Histogram::fromResultRow).collect(Collectors.toList()); } + public static PartitionColumnStatistic deserializeToPartitionStatistics(List resultBatches) { + if (CollectionUtils.isEmpty(resultBatches)) { + return null; + } + return PartitionColumnStatistic.fromResultRow(resultBatches); + } + public static AutoCloseConnectContext buildConnectContext() { return buildConnectContext(false, false); } diff --git a/regression-test/suites/statistics/test_show_partition_stats.groovy b/regression-test/suites/statistics/test_show_partition_stats.groovy index 45253d43a6cd75..01748f1658e6bc 100644 --- a/regression-test/suites/statistics/test_show_partition_stats.groovy +++ b/regression-test/suites/statistics/test_show_partition_stats.groovy @@ -87,6 +87,36 @@ suite("test_show_partition_stats") { def result = sql """show table stats part""" assertEquals(1, result.size()) + assertEquals("18", result[0][0]) + + sql """analyze table part with sync;""" + result = sql """show column cached stats part(id) partition(p1)""" + assertEquals(1, result.size()) + Thread.sleep(1000) + for (int i = 0; i < 10; i++) { + result = sql """show column cached stats part(id) partition(p1)""" + if (result[0][3] == "6.0") { + logger.info("cache is ready.") + assertEquals("id", result[0][0]) + assertEquals("p1", result[0][1]) + assertEquals("part", result[0][2]) + assertEquals("6.0", result[0][3]) + assertEquals("6", result[0][4]) + assertEquals("0.0", result[0][5]) + assertEquals("1.0", result[0][6]) + assertEquals("6.0", result[0][7]) + assertEquals("24.0", result[0][8]) + assertEquals("N/A", result[0][10]) + assertEquals("N/A", result[0][11]) + break; + } + logger.info("cache is not ready yet.") + Thread.sleep(1000) + } + result = sql """show column cached stats part partition(p1)""" + assertEquals(9, result.size()) + result = sql """show column cached stats part partition(*)""" + assertEquals(27, result.size()) sql """drop database if exists test_show_partition_stats""" }