Skip to content

Commit

Permalink
feat(storage): add table function clustering_statistics (#16081)
Browse files Browse the repository at this point in the history
* add table function clustering_statistics

* fix test
  • Loading branch information
zhyass authored Jul 21, 2024
1 parent 24a33bc commit 9320b7a
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 16 deletions.
4 changes: 2 additions & 2 deletions scripts/ci/deploy/databend-query-share.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ echo "current directory:"
pwd

echo "build share endpoint"
ls
ls
mv share-endpoint ../tmp-share-endpoint
cd ../tmp-share-endpoint
cargo build --release
Expand All @@ -40,4 +40,4 @@ python3 scripts/ci/wait_tcp.py --timeout 30 --port 19191

echo "Start query consumer for sharding data"
nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-share-consumer.toml &
python3 scripts/ci/wait_tcp.py --timeout 30 --port 23307
python3 scripts/ci/wait_tcp.py --timeout 30 --port 23307
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_catalog::table_args::TableArgs;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_types::MetaId;
use databend_common_storages_fuse::table_functions::ClusteringStatisticsTable;
use databend_common_storages_fuse::table_functions::FuseAmendTable;
use databend_common_storages_fuse::table_functions::FuseColumnTable;
use databend_common_storages_fuse::table_functions::FuseEncodingTable;
Expand Down Expand Up @@ -160,6 +161,10 @@ impl TableFunctionFactory {
"clustering_information".to_string(),
(next_id(), Arc::new(ClusteringInformationTable::create)),
);
creators.insert(
"clustering_statistics".to_string(),
(next_id(), Arc::new(ClusteringStatisticsTable::create)),
);

creators.insert(
"stream_status".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use indexmap::IndexSet;
use log::debug;
use log::warn;
use minitrace::full_name;
use minitrace::future::FutureExt;
Expand Down Expand Up @@ -253,6 +254,9 @@ impl ReclusterMutator {
.block_thresholds
.check_for_recluster(total_rows as usize, total_bytes as usize)
{
debug!(
"recluster: the statistics of blocks are too small, just merge them into one block"
);
let block_metas = block_metas
.into_iter()
.map(|meta| (None, meta))
Expand Down Expand Up @@ -356,6 +360,7 @@ impl ReclusterMutator {
&mut self,
compact_segments: Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>,
) -> Result<bool> {
debug!("recluster: generate compact tasks");
let settings = self.ctx.get_settings();
let num_block_limit = settings.get_compact_max_block_selection()? as usize;
let num_segment_limit = compact_segments.len();
Expand Down Expand Up @@ -423,6 +428,10 @@ impl ReclusterMutator {
total_bytes: usize,
level: i32,
) -> ReclusterTask {
debug!(
"recluster: generate recluster task, the selected block metas: {:?}, level: {}",
block_metas, level
);
let (stats, parts) =
FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None);
self.recluster_blocks_count += block_metas.len() as u64;
Expand All @@ -444,7 +453,7 @@ impl ReclusterMutator {
let mut indices = IndexSet::new();
let mut points_map: HashMap<Vec<Scalar>, (Vec<usize>, Vec<usize>)> = HashMap::new();
let mut unclustered_sg = IndexSet::new();
for (i, (_, compact_segment)) in compact_segments.iter().enumerate() {
for (i, (loc, compact_segment)) in compact_segments.iter().enumerate() {
let mut level = -1;
let clustered = compact_segment
.summary
Expand All @@ -455,6 +464,10 @@ impl ReclusterMutator {
v.cluster_key_id == self.cluster_key_id
});
if !clustered {
debug!(
"recluster: segment '{}' is unclustered, need to be compacted",
loc.location.0
);
unclustered_sg.insert(i);
continue;
}
Expand Down Expand Up @@ -598,6 +611,7 @@ impl ReclusterMutator {
// round the float to 4 decimal places.
let average_depth =
(10000.0 * sum_depth as f64 / block_depths.len() as f64).round() / 10000.0;
debug!("recluster: average_depth: {}", average_depth);

// find the max point, gather the indices.
if average_depth > depth_threshold {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::table_functions::TableFunction;
use crate::FuseTable;
use crate::Table;

const FUSE_FUNC_CLUSTERING: &str = "clustering_information";
const FUSE_FUNC_CLUSTERING_INFO: &str = "clustering_information";

pub struct ClusteringInformationTable {
table_info: TableInfo,
Expand All @@ -57,9 +57,9 @@ impl ClusteringInformationTable {
table_args: TableArgs,
) -> Result<Arc<dyn TableFunction>> {
let (arg_database_name, arg_table_name, arg_cluster_key) =
parse_db_tb_opt_args(&table_args, FUSE_FUNC_CLUSTERING)?;
parse_db_tb_opt_args(&table_args, FUSE_FUNC_CLUSTERING_INFO)?;

let engine = FUSE_FUNC_CLUSTERING.to_owned();
let engine = FUSE_FUNC_CLUSTERING_INFO.to_owned();

let table_info = TableInfo {
ident: TableIdent::new(table_id, 0),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::table::Table;
use databend_common_exception::Result;
use databend_common_expression::types::string::StringColumnBuilder;
use databend_common_expression::types::DataType;
use databend_common_expression::types::Int32Type;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::StringType;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_common_expression::TableSchemaRefExt;
use databend_common_expression::Value;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::TableSnapshot;

use crate::io::SegmentsIO;
use crate::sessions::TableContext;
use crate::FuseTable;

pub struct ClusteringStatistics<'a> {
pub ctx: Arc<dyn TableContext>,
pub table: &'a FuseTable,
pub limit: Option<usize>,
pub cluster_key_id: u32,
}

impl<'a> ClusteringStatistics<'a> {
pub fn new(
ctx: Arc<dyn TableContext>,
table: &'a FuseTable,
limit: Option<usize>,
cluster_key_id: u32,
) -> Self {
Self {
ctx,
table,
limit,
cluster_key_id,
}
}

#[async_backtrace::framed]
pub async fn get_blocks(&self) -> Result<DataBlock> {
let tbl = self.table;
let maybe_snapshot = tbl.read_table_snapshot().await?;
if let Some(snapshot) = maybe_snapshot {
return self.to_block(snapshot).await;
}

Ok(DataBlock::empty_with_schema(Arc::new(
Self::schema().into(),
)))
}

#[async_backtrace::framed]
async fn to_block(&self, snapshot: Arc<TableSnapshot>) -> Result<DataBlock> {
let limit = self.limit.unwrap_or(usize::MAX);
let len = std::cmp::min(snapshot.summary.block_count as usize, limit);

let mut segment_name = Vec::with_capacity(len);
let mut block_name = StringColumnBuilder::with_capacity(len, len);
let mut max = Vec::with_capacity(len);
let mut min = Vec::with_capacity(len);
let mut level = Vec::with_capacity(len);
let mut pages = Vec::with_capacity(len);

let segments_io = SegmentsIO::create(
self.ctx.clone(),
self.table.operator.clone(),
self.table.schema(),
);

let mut row_num = 0;
let mut end_flag = false;
let chunk_size =
std::cmp::min(self.ctx.get_settings().get_max_threads()? as usize * 4, len).max(1);

let format_vec = |v: &[Scalar]| -> String {
format!(
"[{}]",
v.iter()
.map(|item| format!("{}", item))
.collect::<Vec<_>>()
.join(", ")
)
};
'FOR: for chunk in snapshot.segments.chunks(chunk_size) {
let segments = segments_io
.read_segments::<SegmentInfo>(chunk, true)
.await?;
for (i, segment) in segments.into_iter().enumerate() {
let segment = segment?;
segment_name
.extend(std::iter::repeat(chunk[i].0.clone()).take(segment.blocks.len()));

for block in segment.blocks.iter() {
let block = block.as_ref();
block_name.put_str(&block.location.0);
block_name.commit_row();

let cluster_stats = block.cluster_stats.as_ref();
let clustered = block
.cluster_stats
.as_ref()
.map_or(false, |v| v.cluster_key_id == self.cluster_key_id);
if clustered {
// Safe to unwrap
let cluster_stats = cluster_stats.unwrap();
min.push(Some(format_vec(cluster_stats.min())));
max.push(Some(format_vec(cluster_stats.max())));
level.push(Some(cluster_stats.level));
pages.push(cluster_stats.pages.as_ref().map(|v| format_vec(v)));
} else {
min.push(None);
max.push(None);
level.push(None);
pages.push(None);
}

row_num += 1;
if row_num >= limit {
end_flag = true;
break;
}
}

if end_flag {
break 'FOR;
}
}
}

Ok(DataBlock::new(
vec![
BlockEntry::new(
DataType::String,
Value::Column(StringType::from_data(segment_name)),
),
BlockEntry::new(
DataType::String,
Value::Column(Column::String(block_name.build())),
),
BlockEntry::new(
DataType::String.wrap_nullable(),
Value::Column(StringType::from_opt_data(min)),
),
BlockEntry::new(
DataType::String.wrap_nullable(),
Value::Column(StringType::from_opt_data(max)),
),
BlockEntry::new(
DataType::Number(NumberDataType::Int32).wrap_nullable(),
Value::Column(Int32Type::from_opt_data(level)),
),
BlockEntry::new(
DataType::String.wrap_nullable(),
Value::Column(StringType::from_opt_data(pages)),
),
],
row_num,
))
}

pub fn schema() -> Arc<TableSchema> {
TableSchemaRefExt::create(vec![
TableField::new("segment_name", TableDataType::String),
TableField::new("block_name", TableDataType::String),
TableField::new("min", TableDataType::String.wrap_nullable()),
TableField::new("max", TableDataType::String.wrap_nullable()),
TableField::new(
"level",
TableDataType::Number(NumberDataType::Int32).wrap_nullable(),
),
TableField::new("pages", TableDataType::String.wrap_nullable()),
])
}
}
Loading

0 comments on commit 9320b7a

Please sign in to comment.