From a59a3929dabf217730261757f02fb3ddd68ce2b6 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 25 Jan 2023 19:12:48 +0800 Subject: [PATCH 1/8] feat(query): add topn --- src/query/catalog/src/plan/pushdown.rs | 43 +++++++++++++ .../aggregator/aggregator_final_parallel.rs | 2 +- .../transforms/transform_convert_grouping.rs | 20 ++++-- src/query/storages/fuse/src/fuse_part.rs | 6 +- .../read/native_data_source_deserializer.rs | 1 + .../fuse/src/operations/read_partitions.rs | 61 ++++++++++++++++--- 6 files changed, 117 insertions(+), 16 deletions(-) diff --git a/src/query/catalog/src/plan/pushdown.rs b/src/query/catalog/src/plan/pushdown.rs index 3418ef137491..e3e867a71875 100644 --- a/src/query/catalog/src/plan/pushdown.rs +++ b/src/query/catalog/src/plan/pushdown.rs @@ -15,7 +15,9 @@ use std::fmt::Debug; use common_expression::RemoteExpr; +use common_expression::TableField; use common_expression::TableSchema; +use common_expression::types::DataType; use crate::plan::Projection; @@ -48,7 +50,48 @@ pub struct PushDownInfo { pub order_by: Vec<(RemoteExpr, bool, bool)>, } +/// TopK is a wrapper for topk push down items. +/// We only take the first column in order_by as the topk column. +#[derive(Debug, Clone)] +pub struct TopK { + pub limit: usize, + pub order_by: TableField, + pub asc: bool, + pub column_id: u32, +} + impl PushDownInfo { + pub fn top_k(&self, schema: &TableSchema, support: fn(&DataType) -> bool) -> Option { + if self.order_by.is_empty() && self.limit.is_some() { + let order = &self.order_by[0]; + if let RemoteExpr::::ColumnRef { id, .. } = &order.0 { + let field = schema.field_with_name(id).unwrap(); + let data_type: DataType = field.data_type().into(); + if support(&data_type) { + return None; + } + + let leaf_fields = schema.leaf_fields(); + let column_id = leaf_fields + .iter() + .position(|p| p == field) + .unwrap(); + + let top_k = TopK { + limit: self.limit.unwrap(), + order_by: field.clone(), + asc: order.1, + column_id: column_id as u32, + }; + Some(top_k) + } else { + None + } + } else { + None + } + } + pub fn prewhere_of_push_downs(push_downs: &Option) -> Option { if let Some(PushDownInfo { prewhere, .. }) = push_downs { prewhere.clone() diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs index 115f457811e6..ce0ba4f1c7b2 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs @@ -143,7 +143,7 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static params: Arc, hash_table: Method::HashTable, - reach_limit: bool, + pub(crate) reach_limit: bool, // used for deserialization only, so we can reuse it during the loop temp_place: Option, } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs index 2d9e1eb1947f..44523c4fa773 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs @@ -235,7 +235,7 @@ impl + Send + 'static> Proces port.finish(); *input = InputPortState::Finished; } - InputPortState::Active { port, bucket } if *bucket == self.working_bucket => { + InputPortState::Active { port, bucket } if *bucket <= self.working_bucket => { port.set_need_data(); if !port.has_data() { @@ -302,11 +302,7 @@ impl + Send + 'static> Proces }, }; } - InputPortState::Finished => { /* finished, do nothing */ } - InputPortState::Active { port, bucket } => { - port.set_need_data(); - self.min_bucket = std::cmp::min(*bucket, self.min_bucket); - } + _ => {} } } @@ -465,6 +461,7 @@ struct MergeBucketTransform + input_block: Option, output_blocks: Vec, + need_finish: bool, } impl + Send + 'static> @@ -483,6 +480,7 @@ impl + Send + 'static> params, input_block: None, output_blocks: vec![], + need_finish: false, }))) } } @@ -504,6 +502,7 @@ impl + Send + 'static> Proces self.input_block.take(); self.output_blocks.clear(); self.input.finish(); + self.output.finish(); return Ok(Event::Finished); } @@ -517,6 +516,13 @@ impl + Send + 'static> Proces return Ok(Event::NeedConsume); } + // need to finish if limit is reached + if self.need_finish { + self.input.finish(); + self.output.finish(); + return Ok(Event::Finished); + } + if self.input_block.is_some() { return Ok(Event::Sync); } @@ -553,6 +559,7 @@ impl + Send + 'static> Proces self.output_blocks .extend(bucket_merger.merge_blocks(blocks)?); + self.need_finish = bucket_merger.reach_limit; } false => { let mut bucket_merger = BucketAggregator::::create( @@ -562,6 +569,7 @@ impl + Send + 'static> Proces self.output_blocks .extend(bucket_merger.merge_blocks(blocks)?); + self.need_finish = bucket_merger.reach_limit; } }; } diff --git a/src/query/storages/fuse/src/fuse_part.rs b/src/query/storages/fuse/src/fuse_part.rs index e29411fca363..fb5f6daaee5b 100644 --- a/src/query/storages/fuse/src/fuse_part.rs +++ b/src/query/storages/fuse/src/fuse_part.rs @@ -24,6 +24,7 @@ use common_catalog::plan::PartInfo; use common_catalog::plan::PartInfoPtr; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::Scalar; use storages_common_table_meta::meta::ColumnMeta; use storages_common_table_meta::meta::Compression; @@ -36,7 +37,8 @@ pub struct FusePartInfo { pub nums_rows: usize, pub columns_meta: HashMap, pub compression: Compression, - + + pub sort_min_max: Option<(Scalar, Scalar)>, /// page range in the file pub range: Option>, } @@ -68,6 +70,7 @@ impl FusePartInfo { rows_count: u64, columns_meta: HashMap, compression: Compression, + sort_min_max: Option<(Scalar, Scalar)>, range: Option>, ) -> Arc> { Arc::new(Box::new(FusePartInfo { @@ -76,6 +79,7 @@ impl FusePartInfo { columns_meta, nums_rows: rows_count as usize, compression, + sort_min_max, range, })) } diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 5fc84ef2ec82..c316f908a6e2 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -232,6 +232,7 @@ impl Processor for NativeDeserializeDataTransform { if !chunk.1.has_next() { // No data anymore let _ = self.chunks.pop(); + let _ = self.parts.pop(); return Ok(()); } arrays.push((chunk.0, chunk.1.next_array()?)); diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 34955ff7a928..939b10b93d7d 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -23,13 +23,17 @@ use common_catalog::plan::Partitions; use common_catalog::plan::PartitionsShuffleKind; use common_catalog::plan::Projection; use common_catalog::plan::PushDownInfo; +use common_catalog::plan::TopK; use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_exception::Result; +use common_expression::RemoteExpr; use common_expression::TableSchemaRef; use common_meta_app::schema::TableInfo; use common_storage::ColumnNodes; use opendal::Operator; +use storages_common_index::Index; +use storages_common_index::RangeIndex; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::Location; use tracing::debug; @@ -153,7 +157,7 @@ impl FuseTable { let partitions_scanned = block_metas.len(); - let (mut statistics, parts) = Self::to_partitions(block_metas, &column_nodes, push_downs); + let (mut statistics, parts) = self.to_partitions(block_metas, &column_nodes, push_downs); // Update planner statistics. statistics.partitions_total = partitions_total; @@ -169,6 +173,7 @@ impl FuseTable { } pub fn to_partitions( + &self, block_metas: &[(Option>, Arc)], column_nodes: &ColumnNodes, push_down: Option, @@ -179,16 +184,39 @@ impl FuseTable { .and_then(|p| p.limit) .unwrap_or(usize::MAX); - let (mut statistics, partitions) = match &push_down { - None => Self::all_columns_partitions(block_metas, limit), + let top_k = push_down + .as_ref() + .map(|p| p.top_k(self.schema().as_ref(), RangeIndex::supported_type)) + .unwrap_or_default(); + + let mut block_metas = block_metas.to_vec(); + if let Some(top_k) = &top_k { + block_metas.sort_by(|a, b| { + let a = a.1.col_stats.get(&top_k.column_id).unwrap(); + let b = b.1.col_stats.get(&top_k.column_id).unwrap(); + + if top_k.asc { + a.min.cmp(&b.min) + } else { + a.max.cmp(&b.max).reverse() + } + }); + } + + let (mut statistics, mut partitions) = match &push_down { + None => Self::all_columns_partitions(&block_metas, top_k.clone(), limit), Some(extras) => match &extras.projection { - None => Self::all_columns_partitions(block_metas, limit), + None => Self::all_columns_partitions(&block_metas, top_k.clone(), limit), Some(projection) => { - Self::projection_partitions(block_metas, column_nodes, projection, limit) + Self::projection_partitions(&block_metas, column_nodes, projection, top_k.clone(), limit) } }, }; + if let Some(_) = &top_k { + partitions.kind = PartitionsShuffleKind::Seq; + } + statistics.is_exact = statistics.is_exact && Self::is_exact(&push_down); (statistics, partitions) } @@ -202,6 +230,7 @@ impl FuseTable { pub fn all_columns_partitions( block_metas: &[(Option>, Arc)], + top_k: Option, limit: usize, ) -> (PartStatistics, Partitions) { let mut statistics = PartStatistics::default_exact(); @@ -212,12 +241,11 @@ impl FuseTable { } let mut remaining = limit; - for (range, block_meta) in block_metas.iter() { let rows = block_meta.row_count as usize; partitions .partitions - .push(Self::all_columns_part(range.clone(), block_meta)); + .push(Self::all_columns_part(range.clone(), &top_k, block_meta)); statistics.read_rows += rows; statistics.read_bytes += block_meta.block_size as usize; @@ -239,6 +267,7 @@ impl FuseTable { block_metas: &[(Option>, Arc)], column_nodes: &ColumnNodes, projection: &Projection, + top_k: Option, limit: usize, ) -> (PartStatistics, Partitions) { let mut statistics = PartStatistics::default_exact(); @@ -255,6 +284,7 @@ impl FuseTable { block_meta, range.clone(), column_nodes, + top_k.clone(), projection, )); let rows = block_meta.row_count as usize; @@ -283,7 +313,7 @@ impl FuseTable { (statistics, partitions) } - pub fn all_columns_part(range: Option>, meta: &BlockMeta) -> PartInfoPtr { + pub fn all_columns_part(range: Option>, top_k: &Option, meta: &BlockMeta) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(meta.col_metas.len()); for (idx, column_meta) in &meta.col_metas { @@ -293,12 +323,19 @@ impl FuseTable { let rows_count = meta.row_count; let location = meta.location.0.clone(); let format_version = meta.location.1; + + let sort_min_max = top_k.as_ref().map(|top_k| { + let stat = meta.col_stats.get(&top_k.column_id).unwrap(); + (stat.min.clone(), stat.max.clone()) + }); + FusePartInfo::create( location, format_version, rows_count, columns_meta, meta.compression(), + sort_min_max, range, ) } @@ -307,6 +344,7 @@ impl FuseTable { meta: &BlockMeta, range: Option>, column_nodes: &ColumnNodes, + top_k: Option, projection: &Projection, ) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(projection.len()); @@ -324,6 +362,12 @@ impl FuseTable { let rows_count = meta.row_count; let location = meta.location.0.clone(); let format_version = meta.location.1; + + let sort_min_max = top_k.map(|top_k| { + let stat = meta.col_stats.get(&top_k.column_id).unwrap(); + (stat.min.clone(), stat.max.clone()) + }); + // TODO // row_count should be a hint value of LIMIT, // not the count the rows in this partition @@ -333,6 +377,7 @@ impl FuseTable { rows_count, columns_meta, meta.compression(), + sort_min_max, range, ) } From d49db52463dfea620ad3303cb5fef1dca0491c84 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 25 Jan 2023 19:43:27 +0800 Subject: [PATCH 2/8] feat(query): merging --- .../transforms/transform_convert_grouping.rs | 444 +++--------------- 1 file changed, 56 insertions(+), 388 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs index 44523c4fa773..0e67771494c7 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs @@ -1,413 +1,93 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::Arc; - -use common_exception::Result; -use common_expression::with_hash_method; -use common_expression::BlockMetaInfo; -use common_expression::BlockMetaInfoPtr; -use common_expression::DataBlock; -use common_expression::HashMethod; -use common_expression::HashMethodKind; -use common_pipeline_core::processors::port::InputPort; -use common_pipeline_core::processors::port::OutputPort; -use common_pipeline_core::processors::processor::Event; -use common_pipeline_core::processors::processor::ProcessorPtr; -use common_pipeline_core::processors::Processor; -use common_pipeline_core::Pipe; -use common_pipeline_core::Pipeline; -use serde::Deserialize; -use serde::Deserializer; -use serde::Serialize; -use serde::Serializer; - -use crate::pipelines::processors::transforms::aggregator::AggregateInfo; -use crate::pipelines::processors::transforms::aggregator::BucketAggregator; -use crate::pipelines::processors::transforms::group_by::KeysColumnIter; -use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; -use crate::pipelines::processors::AggregatorParams; - -static MAX_BUCKET_NUM: isize = 256; -// Overflow to object storage data block -static OVERFLOW_BUCKET_NUM: isize = -2; -// Single level data block -static SINGLE_LEVEL_BUCKET_NUM: isize = -1; - -/// -#[derive(Debug)] -struct ConvertGroupingMetaInfo { - #[allow(dead_code)] - pub bucket: isize, - pub blocks: Vec, -} - -impl Serialize for ConvertGroupingMetaInfo { - fn serialize(&self, _: S) -> Result - where S: Serializer { - unreachable!("ConvertGroupingMetaInfo does not support exchanging between multiple nodes") - } -} - -impl<'de> Deserialize<'de> for ConvertGroupingMetaInfo { - fn deserialize(_: D) -> Result - where D: Deserializer<'de> { - unreachable!("ConvertGroupingMetaInfo does not support exchanging between multiple nodes") - } -} - -impl ConvertGroupingMetaInfo { - pub fn create(bucket: isize, blocks: Vec) -> BlockMetaInfoPtr { - Box::new(ConvertGroupingMetaInfo { bucket, blocks }) - } -} - -#[typetag::serde(name = "convert_grouping")] -impl BlockMetaInfo for ConvertGroupingMetaInfo { - fn as_any(&self) -> &dyn Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn Any { - self - } - - fn clone_self(&self) -> Box { - unimplemented!("Unimplemented clone for ConvertGroupingMetaInfo") - } - - fn equals(&self, _: &Box) -> bool { - unimplemented!("Unimplemented equals for ConvertGroupingMetaInfo") - } -} - -enum InputPortState { - Active { port: Arc, bucket: isize }, - Finished, -} - -pub struct TransformConvertGrouping> { - output: Arc, - inputs: Vec, - - working_bucket: isize, - min_bucket: isize, - method: Method, - params: Arc, - buckets_blocks: HashMap>, -} - -impl> TransformConvertGrouping { - pub fn create( - method: Method, - params: Arc, - input_nums: usize, - ) -> Result { - let mut inputs = Vec::with_capacity(input_nums); - - for _index in 0..input_nums { - inputs.push(InputPortState::Active { - bucket: 0, - port: InputPort::create(), - }); + return Ok(Event::NeedData); } - Ok(TransformConvertGrouping { - method, - params, - inputs, - working_bucket: 0, - output: OutputPort::create(), - buckets_blocks: HashMap::new(), - min_bucket: MAX_BUCKET_NUM, - }) - } - - pub fn get_inputs(&self) -> Vec> { - let mut inputs = Vec::with_capacity(self.inputs.len()); - - for input in &self.inputs { - if let InputPortState::Active { port, .. } = input { - inputs.push(port.clone()); - } - } - - inputs - } - - pub fn get_output(&self) -> Arc { - self.output.clone() - } - - fn convert_to_two_level(&self, data_block: DataBlock) -> Result> { - let aggregate_function_len = self.params.aggregate_functions.len(); - let keys_column = data_block - .get_by_offset(aggregate_function_len) - .value - .as_column() - .unwrap(); - let keys_iter = self.method.keys_iter_from_column(keys_column)?; - - let mut indices = Vec::with_capacity(data_block.num_rows()); - - for key_item in keys_iter.iter() { - let hash = self.method.get_hash(key_item); - indices.push((hash as usize >> (64u32 - 8)) as u16); - } - - DataBlock::scatter(&data_block, &indices, 256) - } -} - -#[async_trait::async_trait] -impl + Send + 'static> Processor - for TransformConvertGrouping -{ - fn name(&self) -> String { - String::from("TransformConvertGrouping") - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if self.working_bucket >= MAX_BUCKET_NUM || self.output.is_finished() { - self.output.finish(); - - for input in &self.inputs { - if let InputPortState::Active { port, .. } = input { - port.finish(); - } - } - - self.buckets_blocks.clear(); - return Ok(Event::Finished); + if !self.buckets_blocks.is_empty() && !self.unsplitted_blocks.is_empty() { + // Split data blocks if it's unsplitted. + return Ok(Event::Sync); } if !self.output.can_push() { - for input in &self.inputs { - if let InputPortState::Active { port, .. } = input { - port.set_not_need_data(); - } + for input_state in &self.inputs { + input_state.port.set_not_need_data(); } return Ok(Event::NeedConsume); } - if self.working_bucket == 1 { - if self.buckets_blocks.contains_key(&OVERFLOW_BUCKET_NUM) - || self.buckets_blocks.contains_key(&SINGLE_LEVEL_BUCKET_NUM) - { - return Ok(Event::Sync); - } + let pushed_data_block = self.try_push_data_block(); - if self.buckets_blocks.contains_key(&0) { - if let Some(bucket_blocks) = self.buckets_blocks.remove(&0) { - self.output.push_data(Ok(DataBlock::empty_with_meta( - ConvertGroupingMetaInfo::create(0, bucket_blocks), - ))); + loop { + // Try to pull the next data or until the port is closed + let mut all_inputs_is_finished = true; + let mut all_port_prepared_data = true; - return Ok(Event::NeedConsume); + for index in 0..self.inputs.len() { + if self.inputs[index].port.is_finished() { + continue; } - } - } - let mut all_port_prepared_data = true; - - for input in self.inputs.iter_mut() { - match input { - InputPortState::Active { port, .. } if port.is_finished() => { - port.finish(); - *input = InputPortState::Finished; - } - InputPortState::Active { port, bucket } if *bucket <= self.working_bucket => { - port.set_need_data(); - - if !port.has_data() { - all_port_prepared_data = false; - continue; - } - - let data_block = port.pull_data().unwrap()?; - let data_block_meta: Option<&AggregateInfo> = data_block - .get_meta() - .and_then(|meta| meta.as_any().downcast_ref::()); - - match data_block_meta { - // XXX: None | Some(info) if info.bucket == -1 is compile failure. - None => { - port.finish(); - *input = InputPortState::Finished; - match self.buckets_blocks.entry(SINGLE_LEVEL_BUCKET_NUM) { - Entry::Vacant(v) => { - v.insert(vec![data_block]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(data_block); - } - }; - } - Some(info) if info.bucket == SINGLE_LEVEL_BUCKET_NUM => { - port.finish(); - *input = InputPortState::Finished; - match self.buckets_blocks.entry(SINGLE_LEVEL_BUCKET_NUM) { - Entry::Vacant(v) => { - v.insert(vec![data_block]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(data_block); - } - }; - } - Some(info) => match info.overflow { - None => { - *bucket = info.bucket + 1; - self.min_bucket = std::cmp::min(info.bucket, self.min_bucket); - match self.buckets_blocks.entry(info.bucket) { - Entry::Vacant(v) => { - v.insert(vec![data_block]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(data_block); - } - }; - } - Some(_) => { - // Skipping overflow block. - all_port_prepared_data = false; - match self.buckets_blocks.entry(OVERFLOW_BUCKET_NUM) { - Entry::Vacant(v) => { - v.insert(vec![data_block]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(data_block); - } - }; - } - }, - }; + all_inputs_is_finished = false; + if self.inputs[index].bucket >= self.working_bucket { + continue; } - _ => {} - } - } - if all_port_prepared_data { - // current working bucket is process completed. - if self.working_bucket == 0 - && self.buckets_blocks.contains_key(&SINGLE_LEVEL_BUCKET_NUM) - { - // all single level data block - if self.buckets_blocks.len() == 1 { - self.working_bucket = 256; - - if let Some(bucket_blocks) = - self.buckets_blocks.remove(&SINGLE_LEVEL_BUCKET_NUM) - { - self.output.push_data(Ok(DataBlock::empty_with_meta( - ConvertGroupingMetaInfo::create(SINGLE_LEVEL_BUCKET_NUM, bucket_blocks), - ))); - } - - return Ok(Event::NeedConsume); + self.inputs[index].port.set_need_data(); + if !self.inputs[index].port.has_data() { + all_port_prepared_data = false; + continue; } - // need convert to two level data block - self.working_bucket += 1; - return Ok(Event::Sync); + self.inputs[index].bucket = + self.add_bucket(self.inputs[index].port.pull_data().unwrap()?); + debug_assert!(self.unsplitted_blocks.is_empty()); } - if self.min_bucket == MAX_BUCKET_NUM { - self.output.finish(); - - for input in &self.inputs { - if let InputPortState::Active { port, .. } = input { - port.finish(); - } - } - - return Ok(Event::Finished); + if all_inputs_is_finished { + self.all_inputs_is_finished = true; + break; } - if let Some(bucket_blocks) = self.buckets_blocks.remove(&self.min_bucket) { - self.output.push_data(Ok(DataBlock::empty_with_meta( - ConvertGroupingMetaInfo::create(self.min_bucket, bucket_blocks), - ))); + if !all_port_prepared_data { + return Ok(Event::NeedData); } - self.working_bucket = self.min_bucket + 1; - self.min_bucket = MAX_BUCKET_NUM; + self.working_bucket += 1; + } + + if pushed_data_block || self.try_push_data_block() { return Ok(Event::NeedConsume); } - Ok(Event::NeedData) + self.output.finish(); + Ok(Event::Finished) } fn process(&mut self) -> Result<()> { - if let Some(overflow_blocks) = self.buckets_blocks.get_mut(&OVERFLOW_BUCKET_NUM) { - match overflow_blocks.pop() { - None => { - self.buckets_blocks.remove(&OVERFLOW_BUCKET_NUM); - } - Some(data_block) => { - if let Some(meta) = data_block.get_meta() { - if let Some(meta) = meta.as_any().downcast_ref::() { - let overflow = meta.overflow.as_ref().unwrap(); - for (bucket_id, (_offset, _length)) in &overflow.bucket_info { - // DataBlock - // DataBlock::empty_with_meta() - - match self.buckets_blocks.entry(*bucket_id as isize) { - Entry::Vacant(v) => { - v.insert(vec![]); - } - Entry::Occupied(_v) => { - // v.get_mut().push() - } - }; - } - } - } - } + if let Some(data_block) = self.unsplitted_blocks.pop() { + let data_block_meta: Option<&AggregateInfo> = data_block + .get_meta() + .and_then(|meta| meta.as_any().downcast_ref::()); + + let data_blocks = match data_block_meta { + None => self.convert_to_two_level(data_block)?, + Some(meta) => match &meta.overflow { + None => self.convert_to_two_level(data_block)?, + Some(_overflow_info) => unimplemented!(), + }, }; - } - if let Some(single_level_blocks) = self.buckets_blocks.get_mut(&SINGLE_LEVEL_BUCKET_NUM) { - match single_level_blocks.pop() { - None => { - self.buckets_blocks.remove(&SINGLE_LEVEL_BUCKET_NUM); - } - Some(data_block) => { - let blocks = self.convert_to_two_level(data_block)?; - - for (bucket, block) in blocks.into_iter().enumerate() { - if !block.is_empty() { - match self.buckets_blocks.entry(bucket as isize) { - Entry::Vacant(v) => { - v.insert(vec![block]); - } - Entry::Occupied(mut v) => { - v.get_mut().push(block); - } - }; + for (bucket, block) in data_blocks.into_iter().enumerate() { + if !block.is_empty() { + match self.buckets_blocks.entry(bucket as isize) { + Entry::Vacant(v) => { + v.insert(vec![block]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(block); } - } + }; } - }; + } } Ok(()) @@ -461,7 +141,6 @@ struct MergeBucketTransform + input_block: Option, output_blocks: Vec, - need_finish: bool, } impl + Send + 'static> @@ -480,7 +159,6 @@ impl + Send + 'static> params, input_block: None, output_blocks: vec![], - need_finish: false, }))) } } @@ -502,7 +180,6 @@ impl + Send + 'static> Proces self.input_block.take(); self.output_blocks.clear(); self.input.finish(); - self.output.finish(); return Ok(Event::Finished); } @@ -516,13 +193,6 @@ impl + Send + 'static> Proces return Ok(Event::NeedConsume); } - // need to finish if limit is reached - if self.need_finish { - self.input.finish(); - self.output.finish(); - return Ok(Event::Finished); - } - if self.input_block.is_some() { return Ok(Event::Sync); } @@ -559,7 +229,6 @@ impl + Send + 'static> Proces self.output_blocks .extend(bucket_merger.merge_blocks(blocks)?); - self.need_finish = bucket_merger.reach_limit; } false => { let mut bucket_merger = BucketAggregator::::create( @@ -569,7 +238,6 @@ impl + Send + 'static> Proces self.output_blocks .extend(bucket_merger.merge_blocks(blocks)?); - self.need_finish = bucket_merger.reach_limit; } }; } From 7f2352de7df638e367ddafc02daf01f93beb69ec Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 25 Jan 2023 19:44:21 +0800 Subject: [PATCH 3/8] feat(query): merging --- .../transforms/transform_convert_grouping.rs | 290 ++++++++++++++++++ 1 file changed, 290 insertions(+) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs index 0e67771494c7..248a7add7c4e 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_convert_grouping.rs @@ -1,3 +1,293 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::Arc; + +use common_exception::Result; +use common_expression::with_hash_method; +use common_expression::BlockMetaInfo; +use common_expression::BlockMetaInfoPtr; +use common_expression::DataBlock; +use common_expression::HashMethod; +use common_expression::HashMethodKind; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use common_pipeline_core::Pipe; +use common_pipeline_core::Pipeline; +use serde::Deserialize; +use serde::Deserializer; +use serde::Serialize; +use serde::Serializer; + +use crate::pipelines::processors::transforms::aggregator::AggregateInfo; +use crate::pipelines::processors::transforms::aggregator::BucketAggregator; +use crate::pipelines::processors::transforms::group_by::KeysColumnIter; +use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper; +use crate::pipelines::processors::AggregatorParams; + +// Overflow to object storage data block +#[allow(dead_code)] +static OVERFLOW_BUCKET_NUM: isize = -2; +// Single level data block +static SINGLE_LEVEL_BUCKET_NUM: isize = -1; + +/// +#[derive(Debug)] +struct ConvertGroupingMetaInfo { + #[allow(dead_code)] + pub bucket: isize, + pub blocks: Vec, +} + +impl Serialize for ConvertGroupingMetaInfo { + fn serialize(&self, _: S) -> Result + where S: Serializer { + unreachable!("ConvertGroupingMetaInfo does not support exchanging between multiple nodes") + } +} + +impl<'de> Deserialize<'de> for ConvertGroupingMetaInfo { + fn deserialize(_: D) -> Result + where D: Deserializer<'de> { + unreachable!("ConvertGroupingMetaInfo does not support exchanging between multiple nodes") + } +} + +impl ConvertGroupingMetaInfo { + pub fn create(bucket: isize, blocks: Vec) -> BlockMetaInfoPtr { + Box::new(ConvertGroupingMetaInfo { bucket, blocks }) + } +} + +#[typetag::serde(name = "convert_grouping")] +impl BlockMetaInfo for ConvertGroupingMetaInfo { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn clone_self(&self) -> Box { + unimplemented!("Unimplemented clone for ConvertGroupingMetaInfo") + } + + fn equals(&self, _: &Box) -> bool { + unimplemented!("Unimplemented equals for ConvertGroupingMetaInfo") + } +} + +struct InputPortState { + port: Arc, + bucket: isize, +} + +pub struct TransformConvertGrouping> { + output: Arc, + inputs: Vec, + + method: Method, + working_bucket: isize, + pushing_bucket: isize, + all_inputs_is_finished: bool, + initialized_all_inputs: bool, + params: Arc, + buckets_blocks: HashMap>, + unsplitted_blocks: Vec, +} + +impl> TransformConvertGrouping { + pub fn create( + method: Method, + params: Arc, + input_nums: usize, + ) -> Result { + let mut inputs = Vec::with_capacity(input_nums); + + for _index in 0..input_nums { + inputs.push(InputPortState { + bucket: -1, + port: InputPort::create(), + }); + } + + Ok(TransformConvertGrouping { + method, + params, + inputs, + working_bucket: 0, + pushing_bucket: 0, + output: OutputPort::create(), + buckets_blocks: HashMap::new(), + all_inputs_is_finished: false, + unsplitted_blocks: vec![], + initialized_all_inputs: false, + }) + } + + pub fn get_inputs(&self) -> Vec> { + let mut inputs = Vec::with_capacity(self.inputs.len()); + + for input_state in &self.inputs { + inputs.push(input_state.port.clone()); + } + + inputs + } + + pub fn get_output(&self) -> Arc { + self.output.clone() + } + + fn initialize_all_inputs(&mut self) -> Result { + self.initialized_all_inputs = true; + + for index in 0..self.inputs.len() { + if self.inputs[index].port.is_finished() { + continue; + } + + // We pull the first unsplitted data block + if self.inputs[index].bucket > SINGLE_LEVEL_BUCKET_NUM { + continue; + } + + self.inputs[index].port.set_need_data(); + + if !self.inputs[index].port.has_data() { + self.initialized_all_inputs = false; + continue; + } + + self.inputs[index].bucket = + self.add_bucket(self.inputs[index].port.pull_data().unwrap()?); + } + + Ok(self.initialized_all_inputs) + } + + fn add_bucket(&mut self, data_block: DataBlock) -> isize { + let data_block_meta: Option<&AggregateInfo> = data_block + .get_meta() + .and_then(|meta| meta.as_any().downcast_ref::()); + + if let Some(info) = data_block_meta { + if info.overflow.is_none() && info.bucket > SINGLE_LEVEL_BUCKET_NUM { + let bucket = info.bucket; + match self.buckets_blocks.entry(bucket) { + Entry::Vacant(v) => { + v.insert(vec![data_block]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(data_block); + } + }; + + return bucket; + } + } + + self.unsplitted_blocks.push(data_block); + SINGLE_LEVEL_BUCKET_NUM + } + + fn try_push_data_block(&mut self) -> bool { + match self.buckets_blocks.is_empty() { + true => self.try_push_single_level(), + false => self.try_push_two_level(), + } + } + + fn try_push_two_level(&mut self) -> bool { + while self.pushing_bucket < self.working_bucket { + if let Some(bucket_blocks) = self.buckets_blocks.remove(&self.pushing_bucket) { + let meta = ConvertGroupingMetaInfo::create(self.pushing_bucket, bucket_blocks); + self.output.push_data(Ok(DataBlock::empty_with_meta(meta))); + self.pushing_bucket += 1; + return true; + } + + self.pushing_bucket += 1; + } + + false + } + + fn try_push_single_level(&mut self) -> bool { + if self.unsplitted_blocks.is_empty() { + return false; + } + + let unsplitted_blocks = std::mem::take(&mut self.unsplitted_blocks); + let meta = ConvertGroupingMetaInfo::create(SINGLE_LEVEL_BUCKET_NUM, unsplitted_blocks); + self.output.push_data(Ok(DataBlock::empty_with_meta(meta))); + true + } + + fn convert_to_two_level(&self, data_block: DataBlock) -> Result> { + let aggregate_function_len = self.params.aggregate_functions.len(); + let keys_column = data_block + .get_by_offset(aggregate_function_len) + .value + .as_column() + .unwrap(); + let keys_iter = self.method.keys_iter_from_column(keys_column)?; + + let mut indices = Vec::with_capacity(data_block.num_rows()); + + for key_item in keys_iter.iter() { + let hash = self.method.get_hash(key_item); + indices.push((hash as usize >> (64u32 - 8)) as u16); + } + + DataBlock::scatter(&data_block, &indices, 256) + } +} + +#[async_trait::async_trait] +impl + Send + 'static> Processor + for TransformConvertGrouping +{ + fn name(&self) -> String { + String::from("TransformConvertGrouping") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.all_inputs_is_finished || self.output.is_finished() { + self.output.finish(); + + for input_state in &self.inputs { + input_state.port.finish(); + } + + self.buckets_blocks.clear(); + return Ok(Event::Finished); + } + + // We pull the first unsplitted data block + if !self.initialized_all_inputs && !self.initialize_all_inputs()? { return Ok(Event::NeedData); } From 18eaac1d5bf1405c5b4e65d2f0dd3ed5c0d31477 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 26 Jan 2023 23:13:55 +0800 Subject: [PATCH 4/8] feat(query): add TopN in native storage format --- src/query/catalog/src/plan/pushdown.rs | 20 +- src/query/expression/src/kernels/filter.rs | 194 ++++++++------- src/query/expression/src/kernels/mod.rs | 3 + src/query/expression/src/kernels/topk.rs | 227 ++++++++++++++++++ src/query/expression/src/values.rs | 8 +- .../processors/transforms/hash_join/common.rs | 3 +- .../it/storages/fuse/operations/read_plan.rs | 4 +- .../planner/optimizer/heuristic/heuristic.rs | 4 + src/query/storages/fuse/src/fuse_part.rs | 4 +- .../src/io/read/block/block_reader_native.rs | 1 + .../storages/fuse/src/operations/delete.rs | 3 +- .../operations/mutation/mutation_source.rs | 12 +- .../read/native_data_source_deserializer.rs | 204 +++++++++++++--- .../fuse/src/operations/read_partitions.rs | 51 ++-- .../storages/fuse/src/pruning/fuse_pruner.rs | 4 +- .../hive/hive/src/hive_table_source.rs | 3 +- .../storages/parquet/src/parquet_source.rs | 3 +- 17 files changed, 578 insertions(+), 170 deletions(-) create mode 100644 src/query/expression/src/kernels/topk.rs diff --git a/src/query/catalog/src/plan/pushdown.rs b/src/query/catalog/src/plan/pushdown.rs index e3e867a71875..75d1bb5bea74 100644 --- a/src/query/catalog/src/plan/pushdown.rs +++ b/src/query/catalog/src/plan/pushdown.rs @@ -14,10 +14,10 @@ use std::fmt::Debug; +use common_expression::types::DataType; use common_expression::RemoteExpr; use common_expression::TableField; use common_expression::TableSchema; -use common_expression::types::DataType; use crate::plan::Projection; @@ -62,20 +62,24 @@ pub struct TopK { impl PushDownInfo { pub fn top_k(&self, schema: &TableSchema, support: fn(&DataType) -> bool) -> Option { - if self.order_by.is_empty() && self.limit.is_some() { + if !self.order_by.is_empty() && self.limit.is_some() { let order = &self.order_by[0]; + let limit = self.limit.unwrap(); + + const MAX_TOPK_LIMIT: usize = 1000; + if limit > MAX_TOPK_LIMIT { + return None; + } + if let RemoteExpr::::ColumnRef { id, .. } = &order.0 { let field = schema.field_with_name(id).unwrap(); let data_type: DataType = field.data_type().into(); - if support(&data_type) { + if !support(&data_type) { return None; } - + let leaf_fields = schema.leaf_fields(); - let column_id = leaf_fields - .iter() - .position(|p| p == field) - .unwrap(); + let column_id = leaf_fields.iter().position(|p| p == field).unwrap(); let top_k = TopK { limit: self.limit.unwrap(), diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index bdc208ef7d86..e250578ba8d6 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -20,6 +20,7 @@ use common_arrow::arrow::buffer::Buffer; use common_exception::ErrorCode; use common_exception::Result; +use crate::arrow::bitmap_into_mut; use crate::types::array::ArrayColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; @@ -42,102 +43,22 @@ use crate::Scalar; use crate::TypeDeserializer; use crate::Value; -impl DataBlock { - // check if the predicate has any valid row - pub fn filter_exists(predicate: &Value) -> Result { - let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| { - ErrorCode::BadDataValueType(format!( - "Filter predict column does not support type '{:?}'", - predicate - )) - })?; - match predicate { - Value::Scalar(s) => Ok(s), - Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()), - } - } +pub struct FilterHelpers; +impl DataBlock { pub fn filter(self, predicate: &Value) -> Result { if self.num_rows() == 0 { return Ok(self); } - let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| { + let predicate = FilterHelpers::cast_to_nonull_boolean(predicate).ok_or_else(|| { ErrorCode::BadDataValueType(format!( "Filter predict column does not support type '{:?}'", predicate )) })?; - match predicate { - Value::Scalar(s) => { - if s { - Ok(self) - } else { - Ok(self.slice(0..0)) - } - } - Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap), - } - } - - // Must be numeric, boolean, or string value type - pub fn cast_to_nonull_boolean(predicate: &Value) -> Option> { - match predicate { - Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar), - Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column), - } - } - - fn cast_scalar_to_boolean(s: &Scalar) -> Option { - match s { - Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { - NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()), - }), - Scalar::Boolean(value) => Some(*value), - Scalar::String(value) => Some(!value.is_empty()), - Scalar::Timestamp(value) => Some(*value != 0), - Scalar::Date(value) => Some(*value != 0), - Scalar::Null => Some(false), - _ => None, - } - } - - fn cast_column_to_boolean(c: &Column) -> Option { - match c { - Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { - NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| v != &SRC_TYPE::default()), - &[], - )), - }), - Column::Boolean(value) => Some(value.clone()), - Column::String(value) => Some(BooleanType::column_from_iter( - value.iter().map(|s| !s.is_empty()), - &[], - )), - Column::Timestamp(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| *v != 0), - &[], - )), - Column::Date(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| *v != 0), - &[], - )), - Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()), - Column::Nullable(c) => { - let inner = Self::cast_column_to_boolean(&c.column)?; - Some((&inner) & (&c.validity)) - } - _ => None, - } - } - - pub fn try_as_const_bool(value: &Value) -> Result> { - match value { - Value::Scalar(v) => Ok(Some(*v)), - _ => Ok(None), - } + self.filter_boolean_value(predicate) } pub fn filter_with_bitmap(block: DataBlock, bitmap: &Bitmap) -> Result { @@ -169,6 +90,19 @@ impl DataBlock { } } } + + pub fn filter_boolean_value(self, filter: Value) -> Result { + match filter { + Value::Scalar(s) => { + if s { + Ok(self) + } else { + Ok(self.slice(0..0)) + } + } + Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap), + } + } } impl Column { @@ -380,3 +314,95 @@ impl Column { new.into() } } + +impl FilterHelpers { + // check if the predicate has any valid row + pub fn filter_exists(predicate: &Value) -> Result { + let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| { + ErrorCode::BadDataValueType(format!( + "Filter predict column does not support type '{:?}'", + predicate + )) + })?; + match predicate { + Value::Scalar(s) => Ok(s), + Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()), + } + } + + // Must be numeric, boolean, or string value type + #[inline] + pub fn cast_to_nonull_boolean(predicate: &Value) -> Option> { + match predicate { + Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar), + Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column), + } + } + + #[inline] + pub fn is_all_unset(predicate: &Value) -> bool { + match &predicate { + Value::Scalar(v) => !v, + Value::Column(bitmap) => bitmap.unset_bits() == bitmap.len(), + } + } + + fn cast_scalar_to_boolean(s: &Scalar) -> Option { + match s { + Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { + NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()), + }), + Scalar::Boolean(value) => Some(*value), + Scalar::String(value) => Some(!value.is_empty()), + Scalar::Timestamp(value) => Some(*value != 0), + Scalar::Date(value) => Some(*value != 0), + Scalar::Null => Some(false), + _ => None, + } + } + + fn cast_column_to_boolean(c: &Column) -> Option { + match c { + Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { + NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| v != &SRC_TYPE::default()), + &[], + )), + }), + Column::Boolean(value) => Some(value.clone()), + Column::String(value) => Some(BooleanType::column_from_iter( + value.iter().map(|s| !s.is_empty()), + &[], + )), + Column::Timestamp(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| *v != 0), + &[], + )), + Column::Date(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| *v != 0), + &[], + )), + Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()), + Column::Nullable(c) => { + let inner = Self::cast_column_to_boolean(&c.column)?; + Some((&inner) & (&c.validity)) + } + _ => None, + } + } + + pub fn try_as_const_bool(value: &Value) -> Result> { + match value { + Value::Scalar(v) => Ok(Some(*v)), + _ => Ok(None), + } + } + + pub fn filter_to_bitmap(predicate: Value, rows: usize) -> MutableBitmap { + match predicate { + Value::Scalar(true) => MutableBitmap::from_len_set(rows), + Value::Scalar(false) => MutableBitmap::from_len_zeroed(rows), + Value::Column(bitmap) => bitmap_into_mut(bitmap), + } + } +} diff --git a/src/query/expression/src/kernels/mod.rs b/src/query/expression/src/kernels/mod.rs index 11c7de9e41aa..7ce4b42e3290 100644 --- a/src/query/expression/src/kernels/mod.rs +++ b/src/query/expression/src/kernels/mod.rs @@ -20,8 +20,11 @@ mod scatter; mod sort; mod take; mod take_chunks; +mod topk; +pub use filter::FilterHelpers; pub use group_by::*; pub use group_by_hash::*; pub use sort::*; pub use take_chunks::*; +pub use topk::*; diff --git a/src/query/expression/src/kernels/topk.rs b/src/query/expression/src/kernels/topk.rs new file mode 100644 index 000000000000..2ade339ac871 --- /dev/null +++ b/src/query/expression/src/kernels/topk.rs @@ -0,0 +1,227 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::cmp::Ordering::Less; +use std::mem; +use std::ptr; + +use common_arrow::arrow::bitmap::MutableBitmap; + +use crate::types::*; +use crate::with_number_mapped_type; +use crate::Column; +use crate::Scalar; + +pub struct TopKSorter { + data: Vec, + limit: usize, + asc: bool, +} + +impl TopKSorter { + pub fn new(limit: usize, asc: bool) -> Self { + Self { + data: Vec::with_capacity(limit), + limit, + asc, + } + } + + // Push the column into this sorted and update the bitmap + // The bitmap could be used in filter + pub fn push_column(&mut self, col: &Column, bitmap: &mut MutableBitmap) { + with_number_mapped_type!(|NUM_TYPE| match col.data_type() { + DataType::Number(NumberDataType::NUM_TYPE) => + self.push_column_internal::>(col, bitmap), + DataType::String => self.push_column_internal::(col, bitmap), + DataType::Timestamp => self.push_column_internal::(col, bitmap), + DataType::Date => self.push_column_internal::(col, bitmap), + _ => {} + }); + } + + pub fn push_column_internal(&mut self, col: &Column, bitmap: &mut MutableBitmap) + where for<'a> T::ScalarRef<'a>: Ord { + let col = T::try_downcast_column(col).unwrap(); + for (i, value) in T::iter_column(&col).enumerate() { + if !bitmap.get(i) { + continue; + } + + if self.data.len() < self.limit { + self.data.push(T::upcast_scalar(T::to_owned_scalar(value))); + if self.data.len() == self.limit { + self.make_heap(); + } + } else if !self.push_value(value) { + bitmap.set(i, false); + } + } + } + + #[inline] + fn push_value(&mut self, value: T::ScalarRef<'_>) -> bool + where for<'a> T::ScalarRef<'a>: Ord { + let order = self.ordering(); + let data = self.data[0].clone(); + let data = data.as_ref(); + let data = T::try_downcast_scalar(&data).unwrap(); + let value = T::upcast_gat(value); + + if Ord::cmp(&data, &value) != order { + self.data[0] = T::upcast_scalar(T::to_owned_scalar(value)); + self.adjust(); + true + } else { + false + } + } + + #[inline] + pub fn never_match(&self, (min, max): &(Scalar, Scalar)) -> bool { + if self.data.len() != self.limit { + return false; + } + (self.asc && &self.data[0] < min) || (!self.asc && &self.data[0] > max) + } + + #[inline] + pub fn never_match_any(&self, col: &Column) -> bool { + if self.data.len() != self.limit { + return false; + } + with_number_mapped_type!(|NUM_TYPE| match col.data_type() { + DataType::Number(NumberDataType::NUM_TYPE) => + self.never_match_any_internal::>(col), + DataType::String => self.never_match_any_internal::(col), + DataType::Timestamp => self.never_match_any_internal::(col), + DataType::Date => self.never_match_any_internal::(col), + _ => false, + }) + } + + fn never_match_any_internal(&self, col: &Column) -> bool + where for<'a> T::ScalarRef<'a>: Ord { + let col = T::try_downcast_column(col).unwrap(); + let data = self.data[0].as_ref(); + + for val in T::iter_column(&col) { + let data = T::try_downcast_scalar(&data).unwrap(); + if (self.asc && data >= val) || (!self.asc && data <= val) { + return false; + } + } + true + } + + fn make_heap(&mut self) { + let ordering = self.ordering(); + let data = self.data.as_mut_slice(); + make_heap(data, &mut |a, b| a.cmp(b) == ordering); + } + + fn adjust(&mut self) { + let ordering = self.ordering(); + let data = self.data.as_mut_slice(); + adjust_heap(data, 0, data.len(), &mut |a, b| a.cmp(b) == ordering); + } + + fn ordering(&self) -> Ordering { + if self.asc { Less } else { Less.reverse() } + } +} + +#[inline] +fn make_heap(v: &mut [T], is_less: &mut F) +where F: FnMut(&T, &T) -> bool { + let len = v.len(); + let mut parent = (len - 2) / 2; + + loop { + adjust_heap(v, parent, len, is_less); + if parent == 0 { + return; + } + parent -= 1; + } +} + +/// adjust_heap is a shift up adjust op for the heap +#[inline] +fn adjust_heap(v: &mut [T], hole_index: usize, len: usize, is_less: &mut F) +where F: FnMut(&T, &T) -> bool { + let mut left_child = hole_index * 2 + 1; + + // SAFETY: we ensure hole_index point to a properly initialized value of type T + let mut tmp = unsafe { mem::ManuallyDrop::new(ptr::read(&v[hole_index])) }; + let mut hole = InsertionHole { + src: &mut *tmp, + dest: &mut v[hole_index], + }; + // Panic safety: + // + // If `is_less` panics at any point during the process, `hole` will get dropped and fill the + // hole in `v` with the unconsumed range in `buf`, thus ensuring that `v` still holds every + // object it initially held exactly once. + + // SAFETY: + // we ensure src/dest point to a properly initialized value of type T + // src is valid for reads of `count * size_of::()` bytes. + // dest is valid for reads of `count * size_of::()` bytes. + // Both `src` and `dst` are properly aligned. + + unsafe { + while left_child < len { + // SAFETY: + // we ensure left_child and left_child + 1 are between [0, len) + if left_child + 1 < len + && is_less(v.get_unchecked(left_child), v.get_unchecked(left_child + 1)) + { + left_child += 1; + } + + // SAFETY: + // left_child and hole.dest point to a properly initialized value of type T + if is_less(&*tmp, v.get_unchecked(left_child)) { + ptr::copy_nonoverlapping(&v[left_child], hole.dest, 1); + hole.dest = &mut v[left_child]; + } else { + break; + } + + left_child = left_child * 2 + 1; + } + } + + // These codes is from std::sort_by + // When dropped, copies from `src` into `dest`. + struct InsertionHole { + src: *mut T, + dest: *mut T, + } + + impl Drop for InsertionHole { + fn drop(&mut self) { + // SAFETY: + // we ensure src/dest point to a properly initialized value of type T + // src is valid for reads of `count * size_of::()` bytes. + // dest is valid for reads of `count * size_of::()` bytes. + // Both `src` and `dst` are properly aligned. + unsafe { + ptr::copy_nonoverlapping(self.src, self.dest, 1); + } + } + } +} diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index c0b2ba73ec8f..7b19b6673fe0 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -87,7 +87,7 @@ pub enum Scalar { Variant(Vec), } -#[derive(Clone, Default, EnumAsInner)] +#[derive(Clone, Default, Eq, EnumAsInner)] pub enum ScalarRef<'a> { #[default] Null, @@ -415,6 +415,12 @@ impl PartialOrd for ScalarRef<'_> { } } +impl Ord for ScalarRef<'_> { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap_or(Ordering::Equal) + } +} + impl PartialEq for ScalarRef<'_> { fn eq(&self, other: &Self) -> bool { self.partial_cmp(other) == Some(Ordering::Equal) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index b0f838e87778..b6142ff7ddf4 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -27,6 +27,7 @@ use common_expression::Column; use common_expression::DataBlock; use common_expression::Evaluator; use common_expression::Expr; +use common_expression::FilterHelpers; use common_expression::Scalar; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; @@ -195,7 +196,7 @@ impl JoinHashTable { let filter_vector: Value = evaluator .run(filter) .map_err(|(_, e)| ErrorCode::Internal(format!("Invalid expression: {}", e)))?; - let predict_boolean_nonull = DataBlock::cast_to_nonull_boolean(&filter_vector) + let predict_boolean_nonull = FilterHelpers::cast_to_nonull_boolean(&filter_vector) .ok_or_else(|| ErrorCode::Internal("Cannot get the boolean column"))?; match predict_boolean_nonull { diff --git a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs index e40b323593d7..a44edfd859ce 100644 --- a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs +++ b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs @@ -109,7 +109,7 @@ fn test_to_partitions() -> Result<()> { let column_nodes = ColumnNodes { column_nodes }; // CASE I: no projection - let (s, parts) = FuseTable::to_partitions(&blocks_metas, &column_nodes, None); + let (s, parts) = FuseTable::to_partitions(&blocks_metas, &column_nodes, None, None); assert_eq!(parts.len(), num_of_block as usize); let expected_block_size: u64 = cols_metas .values() @@ -141,7 +141,7 @@ fn test_to_partitions() -> Result<()> { prewhere: None, }); - let (stats, parts) = FuseTable::to_partitions(&blocks_metas, &column_nodes, push_down); + let (stats, parts) = FuseTable::to_partitions(&blocks_metas, &column_nodes, None, push_down); assert_eq!(parts.len(), num_of_block as usize); assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64); diff --git a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs index b2d960421db7..ac8906ba1c12 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs @@ -99,6 +99,10 @@ impl HeuristicOptimizer { let optimized = self.optimize_expression(&pre_optimized)?; let post_optimized = self.post_optimize(optimized)?; + // do it again, some rules may be missed + let optimized = self.optimize_expression(&post_optimized)?; + let post_optimized = self.post_optimize(optimized)?; + Ok(post_optimized) } diff --git a/src/query/storages/fuse/src/fuse_part.rs b/src/query/storages/fuse/src/fuse_part.rs index fb5f6daaee5b..2e509fec094c 100644 --- a/src/query/storages/fuse/src/fuse_part.rs +++ b/src/query/storages/fuse/src/fuse_part.rs @@ -37,7 +37,7 @@ pub struct FusePartInfo { pub nums_rows: usize, pub columns_meta: HashMap, pub compression: Compression, - + pub sort_min_max: Option<(Scalar, Scalar)>, /// page range in the file pub range: Option>, @@ -70,7 +70,7 @@ impl FusePartInfo { rows_count: u64, columns_meta: HashMap, compression: Compression, - sort_min_max: Option<(Scalar, Scalar)>, + sort_min_max: Option<(Scalar, Scalar)>, range: Option>, ) -> Arc> { Arc::new(Box::new(FusePartInfo { diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_native.rs b/src/query/storages/fuse/src/io/read/block/block_reader_native.rs index 9ed904ef64e8..b1951fca5050 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_native.rs @@ -151,6 +151,7 @@ impl BlockReader { if let Some(range) = range { column_meta = column_meta.slice(range.start, range.end); } + let (offset, length) = ( column_meta.offset, column_meta.pages.iter().map(|p| p.length).sum::(), diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index 4fb0265b8606..ae89250fbc4e 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -31,6 +31,7 @@ use common_expression::DataField; use common_expression::DataSchema; use common_expression::Evaluator; use common_expression::Expr; +use common_expression::FilterHelpers; use common_expression::RemoteExpr; use common_expression::TableSchema; use common_expression::Value; @@ -169,7 +170,7 @@ impl FuseTable { let res = evaluator .run(&filter_expr) .map_err(|(_, e)| ErrorCode::Internal(format!("eval try eval const failed: {}.", e)))?; - let predicates = DataBlock::cast_to_nonull_boolean(&res).ok_or_else(|| { + let predicates = FilterHelpers::cast_to_nonull_boolean(&res).ok_or_else(|| { ErrorCode::BadArguments("Result of filter expression cannot be converted to boolean.") })?; diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs index cbbb20f5c9c9..80012922c415 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs @@ -29,6 +29,7 @@ use common_expression::Column; use common_expression::DataBlock; use common_expression::Evaluator; use common_expression::Expr; +use common_expression::FilterHelpers; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_sql::evaluator::BlockOperator; @@ -182,11 +183,12 @@ impl Processor for MutationSource { let res = evaluator.run(filter).map_err(|(_, e)| { ErrorCode::Internal(format!("eval filter failed: {}.", e)) })?; - let predicates = DataBlock::cast_to_nonull_boolean(&res).ok_or_else(|| { - ErrorCode::BadArguments( - "Result of filter expression cannot be converted to boolean.", - ) - })?; + let predicates = + FilterHelpers::cast_to_nonull_boolean(&res).ok_or_else(|| { + ErrorCode::BadArguments( + "Result of filter expression cannot be converted to boolean.", + ) + })?; let affect_rows = match &predicates { Value::Scalar(v) => { diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index c316f908a6e2..5b784a917e94 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::collections::VecDeque; use std::sync::Arc; use common_base::base::Progress; @@ -20,15 +21,19 @@ use common_base::base::ProgressValues; use common_catalog::plan::DataSourcePlan; use common_catalog::plan::PartInfoPtr; use common_catalog::plan::PushDownInfo; +use common_catalog::plan::TopK; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::Column; use common_expression::ConstantFolder; use common_expression::DataBlock; use common_expression::DataSchema; use common_expression::Evaluator; use common_expression::Expr; +use common_expression::FilterHelpers; use common_expression::FunctionContext; +use common_expression::TopKSorter; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::InputPort; @@ -36,6 +41,8 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; +use storages_common_index::Index; +use storages_common_index::RangeIndex; use crate::fuse_part::FusePartInfo; use crate::io::BlockReader; @@ -51,8 +58,8 @@ pub struct NativeDeserializeDataTransform { input: Arc, output: Arc, output_data: Option, - parts: Vec, - chunks: Vec, + parts: VecDeque, + chunks: VecDeque, prewhere_columns: Vec, remain_columns: Vec, @@ -61,7 +68,11 @@ pub struct NativeDeserializeDataTransform { output_schema: DataSchema, prewhere_filter: Arc>, - prewhere_skipped: usize, + skipped_page: usize, + + read_columns: Vec, + top_k: Option<(TopK, TopKSorter, usize)>, + topn_finish: bool, } impl NativeDeserializeDataTransform { @@ -73,15 +84,24 @@ impl NativeDeserializeDataTransform { output: Arc, ) -> Result { let scan_progress = ctx.get_scan_progress(); + + let table_schema = plan.source_info.schema(); let src_schema: DataSchema = (block_reader.schema().as_ref()).into(); - let prewhere_columns: Vec = + let top_k = plan + .push_downs + .as_ref() + .map(|p| p.top_k(table_schema.as_ref(), RangeIndex::supported_type)) + .unwrap_or_default(); + + let mut prewhere_columns: Vec = match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { None => (0..src_schema.num_fields()).collect(), Some(v) => { let projected_arrow_schema = v .prewhere_columns .project_schema(plan.source_info.schema().as_ref()); + projected_arrow_schema .fields() .iter() @@ -90,6 +110,16 @@ impl NativeDeserializeDataTransform { } }; + let top_k = top_k.map(|top_k| { + let index = src_schema.index_of(top_k.order_by.name()).unwrap(); + let sorter = TopKSorter::new(top_k.limit, top_k.asc); + + if prewhere_columns.contains(&index) { + prewhere_columns.push(index); + } + (top_k, sorter, index) + }); + let remain_columns: Vec = (0..src_schema.num_fields()) .filter(|i| !prewhere_columns.contains(i)) .collect(); @@ -117,15 +147,18 @@ impl NativeDeserializeDataTransform { input, output, output_data: None, - parts: vec![], - chunks: vec![], + parts: VecDeque::new(), + chunks: VecDeque::new(), prewhere_columns, remain_columns, src_schema, output_schema, prewhere_filter, - prewhere_skipped: 0, + skipped_page: 0, + top_k, + topn_finish: false, + read_columns: vec![], }, ))) } @@ -148,6 +181,45 @@ impl NativeDeserializeDataTransform { }, ) } + + fn add_block(&mut self, data_block: DataBlock) -> Result<()> { + let rows = data_block.num_rows(); + if rows == 0 { + return Ok(()); + } + let progress_values = ProgressValues { + rows, + bytes: data_block.memory_size(), + }; + self.scan_progress.incr(&progress_values); + self.output_data = Some(data_block); + Ok(()) + } + + /// check topk should return finished or not + fn check_topn(&mut self) { + if let Some((_, sorter, _)) = &mut self.top_k { + if let Some(next_part) = self.parts.front() { + let next_part = next_part.as_any().downcast_ref::().unwrap(); + if next_part.sort_min_max.is_none() { + return; + } + if let Some(sort_min_max) = &next_part.sort_min_max { + self.topn_finish = sorter.never_match(sort_min_max); + } + } + } + } + + fn skip_chunks_page(read_columns: &[usize], chunks: &mut DataChunks) -> Result<()> { + for (index, chunk) in chunks.iter_mut().enumerate() { + if read_columns.contains(&index) { + continue; + } + chunk.1.skip_page()?; + } + Ok(()) + } } impl Processor for NativeDeserializeDataTransform { @@ -179,10 +251,15 @@ impl Processor for NativeDeserializeDataTransform { if !self.input.has_data() { self.input.set_need_data(); } - return Ok(Event::Sync); } + if self.topn_finish { + self.input.finish(); + self.output.finish(); + return Ok(Event::Finished); + } + if self.input.has_data() { let mut data_block = self.input.pull_data().unwrap()?; if let Some(mut source_meta) = data_block.take_meta() { @@ -190,8 +267,15 @@ impl Processor for NativeDeserializeDataTransform { .as_mut_any() .downcast_mut::() { - self.parts = source_meta.part.clone(); - self.chunks = std::mem::take(&mut source_meta.chunks); + self.parts = VecDeque::from(std::mem::take(&mut source_meta.part)); + + self.check_topn(); + if self.topn_finish { + self.input.finish(); + self.output.finish(); + return Ok(Event::Finished); + } + self.chunks = VecDeque::from(std::mem::take(&mut source_meta.chunks)); return Ok(Event::Sync); } } @@ -200,7 +284,7 @@ impl Processor for NativeDeserializeDataTransform { } if self.input.is_finished() { - metrics_inc_pruning_prewhere_nums(self.prewhere_skipped as u64); + metrics_inc_pruning_prewhere_nums(self.skipped_page as u64); self.output.finish(); return Ok(Event::Finished); } @@ -210,31 +294,62 @@ impl Processor for NativeDeserializeDataTransform { } fn process(&mut self) -> Result<()> { - if let Some(chunks) = self.chunks.last_mut() { + if let Some(chunks) = self.chunks.front_mut() { // this means it's empty projection if chunks.is_empty() { - let _ = self.chunks.pop(); - let part = self.parts.pop().unwrap(); + let _ = self.chunks.pop_front(); + let part = self.parts.pop_front().unwrap(); + let part = FusePartInfo::from_part(&part)?; let data_block = DataBlock::new(vec![], part.nums_rows); - let progress_values = ProgressValues { - rows: data_block.num_rows(), - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - self.output_data = Some(data_block); + self.add_block(data_block)?; return Ok(()); } let mut arrays = Vec::with_capacity(chunks.len()); + self.read_columns.clear(); + + // Step 1: Check TOP_K, if prewhere_columns contains not only TOP_K, we can check if TOP_K column can satisfy the heap. + if self.prewhere_columns.len() > 1 { + if let Some((top_k, sorter, index)) = self.top_k.as_mut() { + let chunk = chunks.get_mut(*index).unwrap(); + if !chunk.1.has_next() { + // No data anymore + let _ = self.chunks.pop_front(); + let _ = self.parts.pop_front().unwrap(); + self.check_topn(); + // check finished + return Ok(()); + } + let array = chunk.1.next_array()?; + self.read_columns.push(*index); + + let data_type = top_k.order_by.data_type().into(); + let col = Column::from_arrow(array.as_ref(), &data_type); + + arrays.push((chunk.0, array)); + if sorter.never_match_any(&col) { + self.skipped_page += 1; + return Self::skip_chunks_page(&self.read_columns, chunks); + } + } + } + + // Step 2: Read Prewhere columns and get the filter for index in self.prewhere_columns.iter() { + if self.read_columns.contains(index) { + continue; + } let chunk = chunks.get_mut(*index).unwrap(); if !chunk.1.has_next() { // No data anymore - let _ = self.chunks.pop(); - let _ = self.parts.pop(); + let _ = self.chunks.pop_front(); + let _ = self.parts.pop_front().unwrap(); + + self.check_topn(); return Ok(()); } + self.read_columns.push(*index); arrays.push((chunk.0, chunk.1.next_array()?)); } @@ -246,20 +361,33 @@ impl Processor for NativeDeserializeDataTransform { let result = evaluator.run(filter).map_err(|(_, e)| { ErrorCode::Internal(format!("eval prewhere filter failed: {}.", e)) })?; - let filter = DataBlock::cast_to_nonull_boolean(&result).unwrap(); + let filter = FilterHelpers::cast_to_nonull_boolean(&result).unwrap(); + + // Step 3: Apply the filter, if it's all filtered, we can skip the remain columns. + if FilterHelpers::is_all_unset(&filter) { + self.skipped_page += 1; + return Self::skip_chunks_page(&self.read_columns, chunks); + } - let all_filtered = match &filter { - Value::Scalar(v) => !v, - Value::Column(bitmap) => bitmap.unset_bits() == bitmap.len(), + // Step 4: Apply the filter to topk and update the bitmap, this will filter more results + let filter = if let Some((_, sorter, index)) = &mut self.top_k { + let top_k_column = prewhere_block + .get_by_offset(*index) + .value + .as_column() + .unwrap(); + + let mut bitmap = + FilterHelpers::filter_to_bitmap(filter, prewhere_block.num_rows()); + sorter.push_column(top_k_column, &mut bitmap); + Value::Column(bitmap.into()) + } else { + filter }; - if all_filtered { - self.prewhere_skipped += 1; - for index in self.remain_columns.iter() { - let chunk = chunks.get_mut(*index).unwrap(); - chunk.1.skip_page()?; - } - return Ok(()); + if FilterHelpers::is_all_unset(&filter) { + self.skipped_page += 1; + return Self::skip_chunks_page(&self.read_columns, chunks); } for index in self.remain_columns.iter() { @@ -269,7 +397,7 @@ impl Processor for NativeDeserializeDataTransform { let block = self.block_reader.build_block(arrays)?; let block = block.resort(&self.src_schema, &self.output_schema)?; - block.filter(&result) + block.filter_boolean_value(filter) } None => { for index in self.remain_columns.iter() { @@ -280,12 +408,8 @@ impl Processor for NativeDeserializeDataTransform { } }?; - let progress_values = ProgressValues { - rows: data_block.num_rows(), - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - self.output_data = Some(data_block); + // Step 5: Add the block to topk + self.add_block(data_block)?; } Ok(()) diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 8eb67e166690..46faec815163 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -27,7 +27,6 @@ use common_catalog::plan::TopK; use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_exception::Result; -use common_expression::RemoteExpr; use common_expression::TableSchemaRef; use common_meta_app::schema::TableInfo; use common_storage::ColumnNodes; @@ -153,7 +152,12 @@ impl FuseTable { let partitions_scanned = block_metas.len(); - let (mut statistics, parts) = self.to_partitions(block_metas, &column_nodes, push_downs); + let top_k = push_downs + .as_ref() + .map(|p| p.top_k(self.schema().as_ref(), RangeIndex::supported_type)) + .unwrap_or_default(); + let (mut statistics, parts) = + Self::to_partitions(block_metas, &column_nodes, top_k, push_downs); // Update planner statistics. statistics.partitions_total = partitions_total; @@ -169,9 +173,9 @@ impl FuseTable { } pub fn to_partitions( - &self, block_metas: &[(Option>, Arc)], column_nodes: &ColumnNodes, + top_k: Option, push_down: Option, ) -> (PartStatistics, Partitions) { let limit = push_down @@ -180,11 +184,6 @@ impl FuseTable { .and_then(|p| p.limit) .unwrap_or(usize::MAX); - let top_k = push_down - .as_ref() - .map(|p| p.top_k(self.schema().as_ref(), RangeIndex::supported_type)) - .unwrap_or_default(); - let mut block_metas = block_metas.to_vec(); if let Some(top_k) = &top_k { block_metas.sort_by(|a, b| { @@ -192,9 +191,9 @@ impl FuseTable { let b = b.1.col_stats.get(&top_k.column_id).unwrap(); if top_k.asc { - a.min.cmp(&b.min) + (a.min.as_ref(), a.max.as_ref()).cmp(&(b.min.as_ref(), b.max.as_ref())) } else { - a.max.cmp(&b.max).reverse() + (b.max.as_ref(), b.min.as_ref()).cmp(&(a.max.as_ref(), a.min.as_ref())) } }); } @@ -202,14 +201,18 @@ impl FuseTable { let (mut statistics, mut partitions) = match &push_down { None => Self::all_columns_partitions(&block_metas, top_k.clone(), limit), Some(extras) => match &extras.projection { - None => Self::all_columns_partitions(&block_metas, top_k.clone(), limit), - Some(projection) => { - Self::projection_partitions(&block_metas, column_nodes, projection, top_k.clone(), limit) - } + None => Self::all_columns_partitions(&block_metas, top_k.clone(), limit), + Some(projection) => Self::projection_partitions( + &block_metas, + column_nodes, + projection, + top_k.clone(), + limit, + ), }, }; - if let Some(_) = &top_k { + if top_k.is_some() { partitions.kind = PartitionsShuffleKind::Seq; } @@ -309,7 +312,11 @@ impl FuseTable { (statistics, partitions) } - pub fn all_columns_part(range: Option>, top_k: &Option, meta: &BlockMeta) -> PartInfoPtr { + pub fn all_columns_part( + range: Option>, + top_k: &Option, + meta: &BlockMeta, + ) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(meta.col_metas.len()); for (idx, column_meta) in &meta.col_metas { @@ -319,12 +326,12 @@ impl FuseTable { let rows_count = meta.row_count; let location = meta.location.0.clone(); let format_version = meta.location.1; - - let sort_min_max = top_k.as_ref().map(|top_k| { + + let sort_min_max = top_k.as_ref().map(|top_k| { let stat = meta.col_stats.get(&top_k.column_id).unwrap(); (stat.min.clone(), stat.max.clone()) }); - + FusePartInfo::create( location, format_version, @@ -358,12 +365,12 @@ impl FuseTable { let rows_count = meta.row_count; let location = meta.location.0.clone(); let format_version = meta.location.1; - - let sort_min_max = top_k.map(|top_k| { + + let sort_min_max = top_k.map(|top_k| { let stat = meta.col_stats.get(&top_k.column_id).unwrap(); (stat.min.clone(), stat.max.clone()) }); - + // TODO // row_count should be a hint value of LIMIT, // not the count the rows in this partition diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index a0bd50353675..45973356b4ba 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -177,7 +177,7 @@ impl FusePruner { } // topn pruner: - // if there are ordering + limit clause, use topn pruner + // if there are ordering + limit clause and no filters, use topn pruner fn topn_pruning( &self, metas: Vec<(BlockMetaIndex, Arc)>, @@ -185,7 +185,7 @@ impl FusePruner { let push_down = self.push_down.clone(); if push_down .as_ref() - .filter(|p| !p.order_by.is_empty() && p.limit.is_some()) + .filter(|p| !p.order_by.is_empty() && p.limit.is_some() && p.filters.is_empty()) .is_some() { let schema = self.table_schema.clone(); diff --git a/src/query/storages/hive/hive/src/hive_table_source.rs b/src/query/storages/hive/hive/src/hive_table_source.rs index 36cb22758737..73fadc605ab7 100644 --- a/src/query/storages/hive/hive/src/hive_table_source.rs +++ b/src/query/storages/hive/hive/src/hive_table_source.rs @@ -29,6 +29,7 @@ use common_expression::DataBlock; use common_expression::DataSchemaRef; use common_expression::Evaluator; use common_expression::Expr; +use common_expression::FilterHelpers; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::OutputPort; @@ -150,7 +151,7 @@ impl HiveTableSource { })?; valids.push(res.clone()); // shortcut, if predicates is const boolean (or can be cast to boolean) - match DataBlock::filter_exists(&res)? { + match FilterHelpers::filter_exists(&res)? { true => { exists = true; } diff --git a/src/query/storages/parquet/src/parquet_source.rs b/src/query/storages/parquet/src/parquet_source.rs index da92bfa677d5..187362e827d0 100644 --- a/src/query/storages/parquet/src/parquet_source.rs +++ b/src/query/storages/parquet/src/parquet_source.rs @@ -30,6 +30,7 @@ use common_expression::DataSchemaRef; use common_expression::DataSchemaRefExt; use common_expression::Evaluator; use common_expression::Expr; +use common_expression::FilterHelpers; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::OutputPort; @@ -136,7 +137,7 @@ impl ParquetSource { let res = evaluator.run(filter).map_err(|(_, e)| { ErrorCode::Internal(format!("eval prewhere filter failed: {}.", e)) })?; - let filter = DataBlock::cast_to_nonull_boolean(&res).ok_or_else(|| { + let filter = FilterHelpers::cast_to_nonull_boolean(&res).ok_or_else(|| { ErrorCode::BadArguments( "Result of filter expression cannot be converted to boolean.", ) From 96f9ccec013caea01a62109e6747a9cbdbbd5fff Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 26 Jan 2023 23:51:40 +0800 Subject: [PATCH 5/8] prune_unused can allow empty required columns --- .../optimizer/heuristic/prune_unused_columns.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs b/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs index 1792425133c6..f31b935b3bcc 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs @@ -161,17 +161,9 @@ impl UnusedColumnPruner { .is_none() && p.group_items.is_empty() { - required.insert( - *rel_prop - .output_columns - .iter() - .sorted() - .take(1) - .next() - .ok_or_else(|| { - ErrorCode::Internal("Invalid children without output column") - })?, - ); + if let Some(index) = rel_prop.output_columns.iter().sorted().take(1).next() { + required.insert(*index); + } } p.group_items.iter().for_each(|i| { From 955c931c7d7c726f2c2af05b5e7c88fcfed71b1e Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Fri, 27 Jan 2023 00:13:33 +0800 Subject: [PATCH 6/8] fix --- .../sql/src/planner/optimizer/heuristic/heuristic.rs | 6 ++++-- .../operations/read/native_data_source_deserializer.rs | 10 ++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs index ac8906ba1c12..eb3b5c35f322 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs @@ -44,12 +44,12 @@ pub static DEFAULT_REWRITE_RULES: Lazy> = Lazy::new(|| { RuleID::PushDownLimitAggregate, RuleID::PushDownLimitOuterJoin, RuleID::PushDownLimitScan, - RuleID::PushDownSortScan, RuleID::PushDownFilterEvalScalar, RuleID::PushDownFilterJoin, RuleID::FoldCountAggregate, RuleID::SplitAggregate, RuleID::PushDownFilterScan, + RuleID::PushDownSortScan, ] }); @@ -99,7 +99,9 @@ impl HeuristicOptimizer { let optimized = self.optimize_expression(&pre_optimized)?; let post_optimized = self.post_optimize(optimized)?; - // do it again, some rules may be missed + // do it again, some rules may be missed after the post_optimized + // for example: push down sort + limit (topn) to scan + // TODO: if we push down the filter to scan, we need to remove the filter plan let optimized = self.optimize_expression(&post_optimized)?; let post_optimized = self.post_optimize(optimized)?; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 5b784a917e94..ca8957c9b31a 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -114,8 +114,9 @@ impl NativeDeserializeDataTransform { let index = src_schema.index_of(top_k.order_by.name()).unwrap(); let sorter = TopKSorter::new(top_k.limit, top_k.asc); - if prewhere_columns.contains(&index) { + if !prewhere_columns.contains(&index) { prewhere_columns.push(index); + prewhere_columns.sort(); } (top_k, sorter, index) }); @@ -371,8 +372,13 @@ impl Processor for NativeDeserializeDataTransform { // Step 4: Apply the filter to topk and update the bitmap, this will filter more results let filter = if let Some((_, sorter, index)) = &mut self.top_k { + let index_prewhere = self + .prewhere_columns + .iter() + .position(|x| x == index) + .unwrap(); let top_k_column = prewhere_block - .get_by_offset(*index) + .get_by_offset(index_prewhere) .value .as_column() .unwrap(); From 7c7e96357b7fb1f8f2bd0632fbf15ec523014b34 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Fri, 27 Jan 2023 10:51:36 +0800 Subject: [PATCH 7/8] update --- src/query/expression/src/kernels/filter.rs | 100 +------------- src/query/expression/src/kernels/mod.rs | 1 - .../expression/src/utils/filter_helper.rs | 123 ++++++++++++++++++ src/query/expression/src/utils/mod.rs | 1 + .../processors/transforms/hash_join/common.rs | 2 +- .../planner/optimizer/heuristic/heuristic.rs | 1 - .../heuristic/prewhere_optimization.rs | 3 +- .../heuristic/prune_unused_columns.rs | 4 +- .../rewrite/rule_push_down_limit_aggregate.rs | 2 +- .../rule_push_down_limit_expression.rs | 4 +- .../rule/rewrite/rule_push_down_limit_join.rs | 6 +- .../rule/rewrite/rule_push_down_limit_scan.rs | 5 +- .../rule/rewrite/rule_push_down_limit_sort.rs | 2 +- .../rewrite/rule_push_down_limit_union.rs | 3 +- .../rule/rewrite/rule_push_down_sort_scan.rs | 5 +- .../storages/fuse/src/operations/delete.rs | 2 +- .../operations/mutation/mutation_source.rs | 2 +- .../read/native_data_source_deserializer.rs | 2 +- .../hive/hive/src/hive_table_source.rs | 2 +- .../storages/parquet/src/parquet_source.rs | 2 +- 20 files changed, 154 insertions(+), 118 deletions(-) create mode 100644 src/query/expression/src/utils/filter_helper.rs diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index e250578ba8d6..17f8f0387b3d 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -20,31 +20,25 @@ use common_arrow::arrow::buffer::Buffer; use common_exception::ErrorCode; use common_exception::Result; -use crate::arrow::bitmap_into_mut; +use crate::filter_helper::FilterHelpers; use crate::types::array::ArrayColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; -use crate::types::number::NumberScalar; use crate::types::string::StringColumnBuilder; use crate::types::AnyType; -use crate::types::ArgType; use crate::types::ArrayType; use crate::types::BooleanType; use crate::types::StringType; use crate::types::ValueType; use crate::types::VariantType; -use crate::with_number_mapped_type; use crate::with_number_type; use crate::BlockEntry; use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; -use crate::Scalar; use crate::TypeDeserializer; use crate::Value; -pub struct FilterHelpers; - impl DataBlock { pub fn filter(self, predicate: &Value) -> Result { if self.num_rows() == 0 { @@ -314,95 +308,3 @@ impl Column { new.into() } } - -impl FilterHelpers { - // check if the predicate has any valid row - pub fn filter_exists(predicate: &Value) -> Result { - let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| { - ErrorCode::BadDataValueType(format!( - "Filter predict column does not support type '{:?}'", - predicate - )) - })?; - match predicate { - Value::Scalar(s) => Ok(s), - Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()), - } - } - - // Must be numeric, boolean, or string value type - #[inline] - pub fn cast_to_nonull_boolean(predicate: &Value) -> Option> { - match predicate { - Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar), - Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column), - } - } - - #[inline] - pub fn is_all_unset(predicate: &Value) -> bool { - match &predicate { - Value::Scalar(v) => !v, - Value::Column(bitmap) => bitmap.unset_bits() == bitmap.len(), - } - } - - fn cast_scalar_to_boolean(s: &Scalar) -> Option { - match s { - Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { - NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()), - }), - Scalar::Boolean(value) => Some(*value), - Scalar::String(value) => Some(!value.is_empty()), - Scalar::Timestamp(value) => Some(*value != 0), - Scalar::Date(value) => Some(*value != 0), - Scalar::Null => Some(false), - _ => None, - } - } - - fn cast_column_to_boolean(c: &Column) -> Option { - match c { - Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { - NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| v != &SRC_TYPE::default()), - &[], - )), - }), - Column::Boolean(value) => Some(value.clone()), - Column::String(value) => Some(BooleanType::column_from_iter( - value.iter().map(|s| !s.is_empty()), - &[], - )), - Column::Timestamp(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| *v != 0), - &[], - )), - Column::Date(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| *v != 0), - &[], - )), - Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()), - Column::Nullable(c) => { - let inner = Self::cast_column_to_boolean(&c.column)?; - Some((&inner) & (&c.validity)) - } - _ => None, - } - } - - pub fn try_as_const_bool(value: &Value) -> Result> { - match value { - Value::Scalar(v) => Ok(Some(*v)), - _ => Ok(None), - } - } - - pub fn filter_to_bitmap(predicate: Value, rows: usize) -> MutableBitmap { - match predicate { - Value::Scalar(true) => MutableBitmap::from_len_set(rows), - Value::Scalar(false) => MutableBitmap::from_len_zeroed(rows), - Value::Column(bitmap) => bitmap_into_mut(bitmap), - } - } -} diff --git a/src/query/expression/src/kernels/mod.rs b/src/query/expression/src/kernels/mod.rs index 7ce4b42e3290..8da5e5a65512 100644 --- a/src/query/expression/src/kernels/mod.rs +++ b/src/query/expression/src/kernels/mod.rs @@ -22,7 +22,6 @@ mod take; mod take_chunks; mod topk; -pub use filter::FilterHelpers; pub use group_by::*; pub use group_by_hash::*; pub use sort::*; diff --git a/src/query/expression/src/utils/filter_helper.rs b/src/query/expression/src/utils/filter_helper.rs new file mode 100644 index 000000000000..5c4382a45d7a --- /dev/null +++ b/src/query/expression/src/utils/filter_helper.rs @@ -0,0 +1,123 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::bitmap::MutableBitmap; +use common_exception::ErrorCode; +use common_exception::Result; + +use crate::arrow::bitmap_into_mut; +use crate::types::number::NumberScalar; +use crate::types::AnyType; +use crate::types::ArgType; +use crate::types::BooleanType; +use crate::types::NumberColumn; +use crate::with_number_mapped_type; +use crate::Column; +use crate::Scalar; +use crate::Value; + +pub struct FilterHelpers; + +impl FilterHelpers { + // check if the predicate has any valid row + pub fn filter_exists(predicate: &Value) -> Result { + let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| { + ErrorCode::BadDataValueType(format!( + "Filter predict column does not support type '{:?}'", + predicate + )) + })?; + match predicate { + Value::Scalar(s) => Ok(s), + Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()), + } + } + + // Must be numeric, boolean, or string value type + #[inline] + pub fn cast_to_nonull_boolean(predicate: &Value) -> Option> { + match predicate { + Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar), + Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column), + } + } + + #[inline] + pub fn is_all_unset(predicate: &Value) -> bool { + match &predicate { + Value::Scalar(v) => !v, + Value::Column(bitmap) => bitmap.unset_bits() == bitmap.len(), + } + } + + fn cast_scalar_to_boolean(s: &Scalar) -> Option { + match s { + Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { + NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()), + }), + Scalar::Boolean(value) => Some(*value), + Scalar::String(value) => Some(!value.is_empty()), + Scalar::Timestamp(value) => Some(*value != 0), + Scalar::Date(value) => Some(*value != 0), + Scalar::Null => Some(false), + _ => None, + } + } + + fn cast_column_to_boolean(c: &Column) -> Option { + match c { + Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { + NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| v != &SRC_TYPE::default()), + &[], + )), + }), + Column::Boolean(value) => Some(value.clone()), + Column::String(value) => Some(BooleanType::column_from_iter( + value.iter().map(|s| !s.is_empty()), + &[], + )), + Column::Timestamp(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| *v != 0), + &[], + )), + Column::Date(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| *v != 0), + &[], + )), + Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()), + Column::Nullable(c) => { + let inner = Self::cast_column_to_boolean(&c.column)?; + Some((&inner) & (&c.validity)) + } + _ => None, + } + } + + pub fn try_as_const_bool(value: &Value) -> Result> { + match value { + Value::Scalar(v) => Ok(Some(*v)), + _ => Ok(None), + } + } + + pub fn filter_to_bitmap(predicate: Value, rows: usize) -> MutableBitmap { + match predicate { + Value::Scalar(true) => MutableBitmap::from_len_set(rows), + Value::Scalar(false) => MutableBitmap::from_len_zeroed(rows), + Value::Column(bitmap) => bitmap_into_mut(bitmap), + } + } +} diff --git a/src/query/expression/src/utils/mod.rs b/src/query/expression/src/utils/mod.rs index 9a96cf3ab4cc..4b1ffd2c4b6c 100644 --- a/src/query/expression/src/utils/mod.rs +++ b/src/query/expression/src/utils/mod.rs @@ -19,6 +19,7 @@ pub mod block_thresholds; mod column_from; pub mod date_helper; pub mod display; +pub mod filter_helper; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::chunk::Chunk as ArrowChunk; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index b6142ff7ddf4..b3fdd9f4a196 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -19,6 +19,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::arrow::constant_bitmap; use common_expression::arrow::or_validities; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::nullable::NullableColumn; use common_expression::types::AnyType; use common_expression::types::DataType; @@ -27,7 +28,6 @@ use common_expression::Column; use common_expression::DataBlock; use common_expression::Evaluator; use common_expression::Expr; -use common_expression::FilterHelpers; use common_expression::Scalar; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; diff --git a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs index eb3b5c35f322..f6aec6ee6a71 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs @@ -132,7 +132,6 @@ impl HeuristicOptimizer { // Recursive optimize the result let result = &state.results()[0]; let optimized_result = self.optimize_expression(result)?; - return Ok(optimized_result); } } diff --git a/src/query/sql/src/planner/optimizer/heuristic/prewhere_optimization.rs b/src/query/sql/src/planner/optimizer/heuristic/prewhere_optimization.rs index 8751a491a9f0..3eab1d78eeb0 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/prewhere_optimization.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/prewhere_optimization.rs @@ -92,7 +92,6 @@ impl PrewhereOptimizer { } pub fn prewhere_optimize(&self, s_expr: SExpr) -> Result { - let rel_op = s_expr.plan(); if s_expr.match_pattern(&self.pattern) { let filter: Filter = s_expr.plan().clone().try_into()?; let mut get: Scan = s_expr.child(0)?.plan().clone().try_into()?; @@ -132,7 +131,7 @@ impl PrewhereOptimizer { .iter() .map(|expr| self.prewhere_optimize(expr.clone())) .collect::>>()?; - Ok(SExpr::create(rel_op.clone(), children, None, None)) + Ok(s_expr.replace_children(children)) } } } diff --git a/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs b/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs index f31b935b3bcc..9a96dffdb17c 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs @@ -38,7 +38,9 @@ impl UnusedColumnPruner { } pub fn remove_unused_columns(&self, expr: &SExpr, require_columns: ColumnSet) -> Result { - Self::keep_required_columns(expr, require_columns) + let mut s_expr = Self::keep_required_columns(expr, require_columns)?; + s_expr.applied_rules = expr.applied_rules.clone(); + Ok(s_expr) } /// Keep columns referenced by parent plan node. diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs index 76f0c2a3b8d5..c726c27765bc 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs @@ -78,7 +78,7 @@ impl Rule for RulePushDownLimitAggregate { agg_limit.limit = Some(agg_limit.limit.map_or(count, |c| cmp::max(c, count))); let agg = SExpr::create_unary(RelOperator::Aggregate(agg_limit), agg.child(0)?.clone()); - let mut result = SExpr::create_unary(limit.into(), agg); + let mut result = s_expr.replace_children(vec![agg]); result.set_applied_rule(&self.id); state.add_result(result); } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_expression.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_expression.rs index 991ade0ed07d..9af826b03b97 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_expression.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_expression.rs @@ -72,7 +72,9 @@ impl Rule for RulePushDownLimitExpression { let limit_expr = SExpr::create_unary(RelOperator::Limit(limit), eval_plan.child(0)?.clone()); - let result = SExpr::create_unary(RelOperator::EvalScalar(eval_scalar), limit_expr); + let mut result = SExpr::create_unary(RelOperator::EvalScalar(eval_scalar), limit_expr); + + result.set_applied_rule(&self.id); state.add_result(result); Ok(()) } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_join.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_join.rs index 46d8d8fc49d1..a888522d155c 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_join.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_join.rs @@ -86,13 +86,15 @@ impl Rule for RulePushDownLimitOuterJoin { let join: Join = child.plan().clone().try_into()?; match join.join_type { JoinType::Left | JoinType::Full => { - state.add_result(s_expr.replace_children(vec![child.replace_children(vec![ + let mut result = s_expr.replace_children(vec![child.replace_children(vec![ SExpr::create_unary( RelOperator::Limit(limit.clone()), child.child(0)?.clone(), ), SExpr::create_unary(RelOperator::Limit(limit), child.child(1)?.clone()), - ])])) + ])]); + result.set_applied_rule(&self.id); + state.add_result(result) } _ => {} } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs index 96c47698ae61..94d72766e7ca 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs @@ -73,7 +73,10 @@ impl Rule for RulePushDownLimitScan { count += limit.offset; get.limit = Some(get.limit.map_or(count, |c| cmp::max(c, count))); let get = SExpr::create_leaf(RelOperator::Scan(get)); - state.add_result(s_expr.replace_children(vec![get])); + + let mut result = s_expr.replace_children(vec![get]); + result.set_applied_rule(&self.id); + state.add_result(result); } Ok(()) } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_sort.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_sort.rs index 81eb90ffcb99..326211524f97 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_sort.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_sort.rs @@ -74,7 +74,7 @@ impl Rule for RulePushDownLimitSort { sort_limit.limit = Some(sort_limit.limit.map_or(count, |c| cmp::max(c, count))); let sort = SExpr::create_unary(RelOperator::Sort(sort_limit), sort.child(0)?.clone()); - let mut result = SExpr::create_unary(limit.into(), sort); + let mut result = s_expr.replace_children(vec![sort]); result.set_applied_rule(&self.id); state.add_result(result); } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_union.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_union.rs index 093c9f7708ca..244333a9414e 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_union.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_union.rs @@ -98,8 +98,9 @@ impl Rule for RulePushDownLimitUnion { union_right_child = SExpr::create_unary(new_limit.into(), union_right_child); let mut result = SExpr::create_binary(union.into(), union_left_child, union_right_child); + // Add original limit to top - result = SExpr::create_unary(limit.into(), result); + result = s_expr.replace_children(vec![result]); result.set_applied_rule(&self.id); state.add_result(result); diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_sort_scan.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_sort_scan.rs index e4783c086202..c7c640222764 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_sort_scan.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_sort_scan.rs @@ -76,7 +76,10 @@ impl Rule for RulePushDownSortScan { get.limit = Some(get.limit.map_or(limit, |c| cmp::max(c, limit))); } let get = SExpr::create_leaf(RelOperator::Scan(get)); - state.add_result(s_expr.replace_children(vec![get])); + + let mut result = s_expr.replace_children(vec![get]); + result.set_applied_rule(&self.id); + state.add_result(result); Ok(()) } diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index 295a1a89e48f..890609465eab 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -24,6 +24,7 @@ use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::DataType; use common_expression::BlockEntry; use common_expression::Column; @@ -32,7 +33,6 @@ use common_expression::DataField; use common_expression::DataSchema; use common_expression::Evaluator; use common_expression::Expr; -use common_expression::FilterHelpers; use common_expression::RemoteExpr; use common_expression::TableSchema; use common_expression::Value; diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs index 80012922c415..df84b7d06099 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs @@ -22,6 +22,7 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::AnyType; use common_expression::types::DataType; use common_expression::BlockEntry; @@ -29,7 +30,6 @@ use common_expression::Column; use common_expression::DataBlock; use common_expression::Evaluator; use common_expression::Expr; -use common_expression::FilterHelpers; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_sql::evaluator::BlockOperator; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index ca8957c9b31a..40b31224d180 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -25,13 +25,13 @@ use common_catalog::plan::TopK; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::Column; use common_expression::ConstantFolder; use common_expression::DataBlock; use common_expression::DataSchema; use common_expression::Evaluator; use common_expression::Expr; -use common_expression::FilterHelpers; use common_expression::FunctionContext; use common_expression::TopKSorter; use common_expression::Value; diff --git a/src/query/storages/hive/hive/src/hive_table_source.rs b/src/query/storages/hive/hive/src/hive_table_source.rs index 73fadc605ab7..3849fb7ba67a 100644 --- a/src/query/storages/hive/hive/src/hive_table_source.rs +++ b/src/query/storages/hive/hive/src/hive_table_source.rs @@ -24,12 +24,12 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::AnyType; use common_expression::DataBlock; use common_expression::DataSchemaRef; use common_expression::Evaluator; use common_expression::Expr; -use common_expression::FilterHelpers; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::OutputPort; diff --git a/src/query/storages/parquet/src/parquet_source.rs b/src/query/storages/parquet/src/parquet_source.rs index 187362e827d0..c79a4bc5d512 100644 --- a/src/query/storages/parquet/src/parquet_source.rs +++ b/src/query/storages/parquet/src/parquet_source.rs @@ -24,13 +24,13 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::BooleanType; use common_expression::DataBlock; use common_expression::DataSchemaRef; use common_expression::DataSchemaRefExt; use common_expression::Evaluator; use common_expression::Expr; -use common_expression::FilterHelpers; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::OutputPort; From 1c591d5cd97cb75f85f812fcf5e117ad5f562227 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Thu, 26 Jan 2023 22:54:31 -0800 Subject: [PATCH 8/8] Update src/query/expression/src/kernels/topk.rs Co-authored-by: BohuTANG --- src/query/expression/src/kernels/topk.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/expression/src/kernels/topk.rs b/src/query/expression/src/kernels/topk.rs index 2ade339ac871..74f32160e1e8 100644 --- a/src/query/expression/src/kernels/topk.rs +++ b/src/query/expression/src/kernels/topk.rs @@ -52,7 +52,7 @@ impl TopKSorter { }); } - pub fn push_column_internal(&mut self, col: &Column, bitmap: &mut MutableBitmap) + fn push_column_internal(&mut self, col: &Column, bitmap: &mut MutableBitmap) where for<'a> T::ScalarRef<'a>: Ord { let col = T::try_downcast_column(col).unwrap(); for (i, value) in T::iter_column(&col).enumerate() {