diff --git a/scripts/ci/deploy/databend-query-share.sh b/scripts/ci/deploy/databend-query-share.sh index ac48d82c0d6b9..5407efbdc98f1 100755 --- a/scripts/ci/deploy/databend-query-share.sh +++ b/scripts/ci/deploy/databend-query-share.sh @@ -16,7 +16,7 @@ echo "current directory:" pwd echo "build share endpoint" -ls +ls mv share-endpoint ../tmp-share-endpoint cd ../tmp-share-endpoint cargo build --release @@ -40,4 +40,4 @@ python3 scripts/ci/wait_tcp.py --timeout 30 --port 19191 echo "Start query consumer for sharding data" nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-share-consumer.toml & -python3 scripts/ci/wait_tcp.py --timeout 30 --port 23307 \ No newline at end of file +python3 scripts/ci/wait_tcp.py --timeout 30 --port 23307 diff --git a/src/query/service/src/table_functions/table_function_factory.rs b/src/query/service/src/table_functions/table_function_factory.rs index 3ac23fbd9edb3..0f2720cf9b5ef 100644 --- a/src/query/service/src/table_functions/table_function_factory.rs +++ b/src/query/service/src/table_functions/table_function_factory.rs @@ -19,6 +19,7 @@ use databend_common_catalog::table_args::TableArgs; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_types::MetaId; +use databend_common_storages_fuse::table_functions::ClusteringStatisticsTable; use databend_common_storages_fuse::table_functions::FuseAmendTable; use databend_common_storages_fuse::table_functions::FuseColumnTable; use databend_common_storages_fuse::table_functions::FuseEncodingTable; @@ -160,6 +161,10 @@ impl TableFunctionFactory { "clustering_information".to_string(), (next_id(), Arc::new(ClusteringInformationTable::create)), ); + creators.insert( + "clustering_statistics".to_string(), + (next_id(), Arc::new(ClusteringStatisticsTable::create)), + ); creators.insert( "stream_status".to_string(), diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index f5412fb38b693..c589c4417faab 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -38,6 +38,7 @@ use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; use indexmap::IndexSet; +use log::debug; use log::warn; use minitrace::full_name; use minitrace::future::FutureExt; @@ -253,6 +254,9 @@ impl ReclusterMutator { .block_thresholds .check_for_recluster(total_rows as usize, total_bytes as usize) { + debug!( + "recluster: the statistics of blocks are too small, just merge them into one block" + ); let block_metas = block_metas .into_iter() .map(|meta| (None, meta)) @@ -356,6 +360,7 @@ impl ReclusterMutator { &mut self, compact_segments: Vec<(SegmentLocation, Arc)>, ) -> Result { + debug!("recluster: generate compact tasks"); let settings = self.ctx.get_settings(); let num_block_limit = settings.get_compact_max_block_selection()? as usize; let num_segment_limit = compact_segments.len(); @@ -423,6 +428,10 @@ impl ReclusterMutator { total_bytes: usize, level: i32, ) -> ReclusterTask { + debug!( + "recluster: generate recluster task, the selected block metas: {:?}, level: {}", + block_metas, level + ); let (stats, parts) = FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None); self.recluster_blocks_count += block_metas.len() as u64; @@ -444,7 +453,7 @@ impl ReclusterMutator { let mut indices = IndexSet::new(); let mut points_map: HashMap, (Vec, Vec)> = HashMap::new(); let mut unclustered_sg = IndexSet::new(); - for (i, (_, compact_segment)) in compact_segments.iter().enumerate() { + for (i, (loc, compact_segment)) in compact_segments.iter().enumerate() { let mut level = -1; let clustered = compact_segment .summary @@ -455,6 +464,10 @@ impl ReclusterMutator { v.cluster_key_id == self.cluster_key_id }); if !clustered { + debug!( + "recluster: segment '{}' is unclustered, need to be compacted", + loc.location.0 + ); unclustered_sg.insert(i); continue; } @@ -598,6 +611,7 @@ impl ReclusterMutator { // round the float to 4 decimal places. let average_depth = (10000.0 * sum_depth as f64 / block_depths.len() as f64).round() / 10000.0; + debug!("recluster: average_depth: {}", average_depth); // find the max point, gather the indices. if average_depth > depth_threshold { diff --git a/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information_table.rs b/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information_table.rs index 92f35e2f1f6a0..efe7376d2b230 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information_table.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information_table.rs @@ -40,7 +40,7 @@ use crate::table_functions::TableFunction; use crate::FuseTable; use crate::Table; -const FUSE_FUNC_CLUSTERING: &str = "clustering_information"; +const FUSE_FUNC_CLUSTERING_INFO: &str = "clustering_information"; pub struct ClusteringInformationTable { table_info: TableInfo, @@ -57,9 +57,9 @@ impl ClusteringInformationTable { table_args: TableArgs, ) -> Result> { let (arg_database_name, arg_table_name, arg_cluster_key) = - parse_db_tb_opt_args(&table_args, FUSE_FUNC_CLUSTERING)?; + parse_db_tb_opt_args(&table_args, FUSE_FUNC_CLUSTERING_INFO)?; - let engine = FUSE_FUNC_CLUSTERING.to_owned(); + let engine = FUSE_FUNC_CLUSTERING_INFO.to_owned(); let table_info = TableInfo { ident: TableIdent::new(table_id, 0), diff --git a/src/query/storages/fuse/src/table_functions/clustering_statistics/clustering_stat.rs b/src/query/storages/fuse/src/table_functions/clustering_statistics/clustering_stat.rs new file mode 100644 index 0000000000000..e5b7d914926c6 --- /dev/null +++ b/src/query/storages/fuse/src/table_functions/clustering_statistics/clustering_stat.rs @@ -0,0 +1,198 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::table::Table; +use databend_common_exception::Result; +use databend_common_expression::types::string::StringColumnBuilder; +use databend_common_expression::types::DataType; +use databend_common_expression::types::Int32Type; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::StringType; +use databend_common_expression::BlockEntry; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::Scalar; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchema; +use databend_common_expression::TableSchemaRefExt; +use databend_common_expression::Value; +use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::TableSnapshot; + +use crate::io::SegmentsIO; +use crate::sessions::TableContext; +use crate::FuseTable; + +pub struct ClusteringStatistics<'a> { + pub ctx: Arc, + pub table: &'a FuseTable, + pub limit: Option, + pub cluster_key_id: u32, +} + +impl<'a> ClusteringStatistics<'a> { + pub fn new( + ctx: Arc, + table: &'a FuseTable, + limit: Option, + cluster_key_id: u32, + ) -> Self { + Self { + ctx, + table, + limit, + cluster_key_id, + } + } + + #[async_backtrace::framed] + pub async fn get_blocks(&self) -> Result { + let tbl = self.table; + let maybe_snapshot = tbl.read_table_snapshot().await?; + if let Some(snapshot) = maybe_snapshot { + return self.to_block(snapshot).await; + } + + Ok(DataBlock::empty_with_schema(Arc::new( + Self::schema().into(), + ))) + } + + #[async_backtrace::framed] + async fn to_block(&self, snapshot: Arc) -> Result { + let limit = self.limit.unwrap_or(usize::MAX); + let len = std::cmp::min(snapshot.summary.block_count as usize, limit); + + let mut segment_name = Vec::with_capacity(len); + let mut block_name = StringColumnBuilder::with_capacity(len, len); + let mut max = Vec::with_capacity(len); + let mut min = Vec::with_capacity(len); + let mut level = Vec::with_capacity(len); + let mut pages = Vec::with_capacity(len); + + let segments_io = SegmentsIO::create( + self.ctx.clone(), + self.table.operator.clone(), + self.table.schema(), + ); + + let mut row_num = 0; + let mut end_flag = false; + let chunk_size = + std::cmp::min(self.ctx.get_settings().get_max_threads()? as usize * 4, len).max(1); + + let format_vec = |v: &[Scalar]| -> String { + format!( + "[{}]", + v.iter() + .map(|item| format!("{}", item)) + .collect::>() + .join(", ") + ) + }; + 'FOR: for chunk in snapshot.segments.chunks(chunk_size) { + let segments = segments_io + .read_segments::(chunk, true) + .await?; + for (i, segment) in segments.into_iter().enumerate() { + let segment = segment?; + segment_name + .extend(std::iter::repeat(chunk[i].0.clone()).take(segment.blocks.len())); + + for block in segment.blocks.iter() { + let block = block.as_ref(); + block_name.put_str(&block.location.0); + block_name.commit_row(); + + let cluster_stats = block.cluster_stats.as_ref(); + let clustered = block + .cluster_stats + .as_ref() + .map_or(false, |v| v.cluster_key_id == self.cluster_key_id); + if clustered { + // Safe to unwrap + let cluster_stats = cluster_stats.unwrap(); + min.push(Some(format_vec(cluster_stats.min()))); + max.push(Some(format_vec(cluster_stats.max()))); + level.push(Some(cluster_stats.level)); + pages.push(cluster_stats.pages.as_ref().map(|v| format_vec(v))); + } else { + min.push(None); + max.push(None); + level.push(None); + pages.push(None); + } + + row_num += 1; + if row_num >= limit { + end_flag = true; + break; + } + } + + if end_flag { + break 'FOR; + } + } + } + + Ok(DataBlock::new( + vec![ + BlockEntry::new( + DataType::String, + Value::Column(StringType::from_data(segment_name)), + ), + BlockEntry::new( + DataType::String, + Value::Column(Column::String(block_name.build())), + ), + BlockEntry::new( + DataType::String.wrap_nullable(), + Value::Column(StringType::from_opt_data(min)), + ), + BlockEntry::new( + DataType::String.wrap_nullable(), + Value::Column(StringType::from_opt_data(max)), + ), + BlockEntry::new( + DataType::Number(NumberDataType::Int32).wrap_nullable(), + Value::Column(Int32Type::from_opt_data(level)), + ), + BlockEntry::new( + DataType::String.wrap_nullable(), + Value::Column(StringType::from_opt_data(pages)), + ), + ], + row_num, + )) + } + + pub fn schema() -> Arc { + TableSchemaRefExt::create(vec![ + TableField::new("segment_name", TableDataType::String), + TableField::new("block_name", TableDataType::String), + TableField::new("min", TableDataType::String.wrap_nullable()), + TableField::new("max", TableDataType::String.wrap_nullable()), + TableField::new( + "level", + TableDataType::Number(NumberDataType::Int32).wrap_nullable(), + ), + TableField::new("pages", TableDataType::String.wrap_nullable()), + ]) + } +} diff --git a/src/query/storages/fuse/src/table_functions/clustering_statistics/clustering_stat_table.rs b/src/query/storages/fuse/src/table_functions/clustering_statistics/clustering_stat_table.rs new file mode 100644 index 0000000000000..d0331eda6d04d --- /dev/null +++ b/src/query/storages/fuse/src/table_functions/clustering_statistics/clustering_stat_table.rs @@ -0,0 +1,211 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use databend_common_catalog::catalog_kind::CATALOG_DEFAULT; +use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartStatistics; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_meta_app::schema::TableIdent; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; + +use crate::sessions::TableContext; +use crate::table_functions::parse_db_tb_args; +use crate::table_functions::string_literal; +use crate::table_functions::ClusteringStatistics; +use crate::table_functions::TableArgs; +use crate::table_functions::TableFunction; +use crate::FuseTable; +use crate::Table; + +const FUSE_FUNC_CLUSTERING_STAT: &str = "clustering_statistics"; + +pub struct ClusteringStatisticsTable { + table_info: TableInfo, + arg_database_name: String, + arg_table_name: String, +} + +impl ClusteringStatisticsTable { + pub fn create( + database_name: &str, + table_func_name: &str, + table_id: u64, + table_args: TableArgs, + ) -> Result> { + let (arg_database_name, arg_table_name) = + parse_db_tb_args(&table_args, FUSE_FUNC_CLUSTERING_STAT)?; + + let engine = FUSE_FUNC_CLUSTERING_STAT.to_owned(); + + let table_info = TableInfo { + ident: TableIdent::new(table_id, 0), + desc: format!("'{}'.'{}'", database_name, table_func_name), + name: table_func_name.to_string(), + meta: TableMeta { + schema: ClusteringStatistics::schema(), + engine, + ..Default::default() + }, + ..Default::default() + }; + + Ok(Arc::new(ClusteringStatisticsTable { + table_info, + arg_database_name, + arg_table_name, + })) + } +} + +#[async_trait::async_trait] +impl Table for ClusteringStatisticsTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + #[async_backtrace::framed] + async fn read_partitions( + &self, + _ctx: Arc, + _push_downs: Option, + _dry_run: bool, + ) -> Result<(PartStatistics, Partitions)> { + Ok((PartStatistics::default(), Partitions::default())) + } + + fn table_args(&self) -> Option { + let args = vec![ + string_literal(self.arg_database_name.as_str()), + string_literal(self.arg_table_name.as_str()), + ]; + Some(TableArgs::new_positioned(args)) + } + + fn read_data( + &self, + ctx: Arc, + plan: &DataSourcePlan, + pipeline: &mut Pipeline, + _put_cache: bool, + ) -> Result<()> { + pipeline.add_source( + |output| { + ClusteringStatisticsSource::create( + ctx.clone(), + output, + self.arg_database_name.to_owned(), + self.arg_table_name.to_owned(), + plan.push_downs.as_ref().and_then(|x| x.limit), + ) + }, + 1, + )?; + + Ok(()) + } +} + +struct ClusteringStatisticsSource { + finish: bool, + ctx: Arc, + arg_database_name: String, + arg_table_name: String, + limit: Option, +} + +impl ClusteringStatisticsSource { + pub fn create( + ctx: Arc, + output: Arc, + arg_database_name: String, + arg_table_name: String, + limit: Option, + ) -> Result { + AsyncSourcer::create(ctx.clone(), output, ClusteringStatisticsSource { + ctx, + finish: false, + arg_table_name, + arg_database_name, + limit, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for ClusteringStatisticsSource { + const NAME: &'static str = "fuse_block"; + + #[async_trait::unboxed_simple] + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + if self.finish { + return Ok(None); + } + + self.finish = true; + let tenant_id = self.ctx.get_tenant(); + let tbl = self + .ctx + .get_catalog(CATALOG_DEFAULT) + .await? + .get_table( + &tenant_id, + self.arg_database_name.as_str(), + self.arg_table_name.as_str(), + ) + .await?; + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + + let Some(cluster_key_id) = tbl.cluster_key_id() else { + return Err(ErrorCode::UnclusteredTable(format!( + "Unclustered table '{}.{}'", + self.arg_database_name, self.arg_table_name, + ))); + }; + + Ok(Some( + ClusteringStatistics::new(self.ctx.clone(), tbl, self.limit, cluster_key_id) + .get_blocks() + .await?, + )) + } +} + +impl TableFunction for ClusteringStatisticsTable { + fn function_name(&self) -> &str { + self.name() + } + + fn as_table<'a>(self: Arc) -> Arc + where Self: 'a { + self + } +} diff --git a/src/query/storages/fuse/src/table_functions/clustering_statistics/mod.rs b/src/query/storages/fuse/src/table_functions/clustering_statistics/mod.rs new file mode 100644 index 0000000000000..5209e4ab30b46 --- /dev/null +++ b/src/query/storages/fuse/src/table_functions/clustering_statistics/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod clustering_stat; +mod clustering_stat_table; + +pub use clustering_stat::ClusteringStatistics; +pub use clustering_stat_table::ClusteringStatisticsTable; diff --git a/src/query/storages/fuse/src/table_functions/mod.rs b/src/query/storages/fuse/src/table_functions/mod.rs index 367fce05328ae..ca0b5fd3c7295 100644 --- a/src/query/storages/fuse/src/table_functions/mod.rs +++ b/src/query/storages/fuse/src/table_functions/mod.rs @@ -12,23 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod cache_admin; mod clustering_information; +mod clustering_statistics; +mod function_template; +mod fuse_amend; mod fuse_blocks; mod fuse_columns; mod fuse_encodings; mod fuse_segments; mod fuse_snapshots; - -mod cache_admin; - -mod function_template; -mod fuse_amend; mod fuse_statistics; mod table_args; pub use cache_admin::SetCacheCapacity; pub use clustering_information::ClusteringInformation; pub use clustering_information::ClusteringInformationTable; +pub use clustering_statistics::ClusteringStatistics; +pub use clustering_statistics::ClusteringStatisticsTable; use databend_common_catalog::table_args::TableArgs; use databend_common_catalog::table_function::TableFunction; pub use function_template::SimpleTableFunc; diff --git a/tests/cloud_control_server/simple_server.py b/tests/cloud_control_server/simple_server.py index e693d8a14fcba..f446fc2c50360 100644 --- a/tests/cloud_control_server/simple_server.py +++ b/tests/cloud_control_server/simple_server.py @@ -44,9 +44,9 @@ def load_data_from_json(): notification_history_data = json.load(f) notification_history = notification_pb2.NotificationHistory() json_format.ParseDict(notification_history_data, notification_history) - NOTIFICATION_HISTORY_DB[ - notification_history.name - ] = notification_history + NOTIFICATION_HISTORY_DB[notification_history.name] = ( + notification_history + ) def create_task_request_to_task(id, create_task_request): diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0014_func_clustering_information_function.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0014_func_clustering_information_function.test index 5d92d83a8ee09..8220f2b5a48a1 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0014_func_clustering_information_function.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0014_func_clustering_information_function.test @@ -22,6 +22,13 @@ select * from t09_0014 order by b, a 1 3 4 4 +query TTI +select min, max, level from clustering_statistics('default','t09_0014') order by min +---- +[1, 1] [3, 0] 0 +[1, 2] [3, 1] 0 +[4, 4] [4, 4] 0 + query TIIFFT select * exclude(timestamp) from clustering_information('default','t09_0014') ---- @@ -38,6 +45,9 @@ ALTER TABLE t09_0014 DROP CLUSTER KEY statement error 1118 select * from clustering_information('default','t09_0014') +statement error 1118 +select * from clustering_statistics('default','t09_0014') + query TIIFFT select * exclude(timestamp) from clustering_information('default','t09_0014', '(b, a)') ---- diff --git a/tests/suites/0_stateless/18_rbac/18_0006_mysql_auth.py b/tests/suites/0_stateless/18_rbac/18_0006_mysql_auth.py index 92101ff86fe7f..e4c2ce10041fd 100755 --- a/tests/suites/0_stateless/18_rbac/18_0006_mysql_auth.py +++ b/tests/suites/0_stateless/18_rbac/18_0006_mysql_auth.py @@ -167,4 +167,3 @@ cursor.execute("select 123;") except mysql.connector.errors.OperationalError: print("u4 is timeout") - diff --git a/tests/suites/1_stateful/02_query/02_0008_profile.py b/tests/suites/1_stateful/02_query/02_0008_profile.py index 0762c11a55bf9..26fd6b23ca07d 100755 --- a/tests/suites/1_stateful/02_query/02_0008_profile.py +++ b/tests/suites/1_stateful/02_query/02_0008_profile.py @@ -36,7 +36,7 @@ def judge(profile): cpu_time += item["statistics"][0] print(memory_usage) - print(cpu_time>0) + print(cpu_time > 0) print(error_count)